From 7d197dbe4be46b63377936610ea0d01572c0a930 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 20 May 2022 11:08:46 +0200 Subject: [PATCH] cotinued sched service test --- config/hook_implementation.py | 28 ++-- pus_tc/cmd_definitions.py | 13 +- pus_tc/pus_11_tc_sched.py | 143 ++++++++++++++++++ pus_tc/{service_17_test.py => pus_17_test.py} | 0 .../{service_200_mode.py => pus_200_mode.py} | 0 pus_tc/tc_packing.py | 12 +- tmtcc.py | 7 +- 7 files changed, 181 insertions(+), 22 deletions(-) create mode 100644 pus_tc/pus_11_tc_sched.py rename pus_tc/{service_17_test.py => pus_17_test.py} (100%) rename pus_tc/{service_200_mode.py => pus_200_mode.py} (100%) diff --git a/config/hook_implementation.py b/config/hook_implementation.py index 3ebdb3e..1e687cd 100644 --- a/config/hook_implementation.py +++ b/config/hook_implementation.py @@ -1,24 +1,33 @@ -from typing import Dict, Optional - +from typing import Optional, Union from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.config.definitions import ServiceOpCodeDictT from tmtccmd.config.hook import TmTcHookBase from tmtccmd.core.backend import TmTcHandler from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.utility.obj_id import ObjectIdDictT from tmtccmd.utility.retval import RetvalDictT from common_tmtc.config.definitions import TM_SP_IDS -from common_tmtc.pus_tc.cmd_definitions import get_fsfw_service_op_code_dict +from common_tmtc.pus_tc.cmd_definitions import common_fsfw_service_op_code_dict -class FsfwHookBase(TmTcHookBase): +class CommonFsfwHookBase(TmTcHookBase): + def pack_service_queue( + self, service: Union[int, str], op_code: str, service_queue: TcQueueT + ): + from common_tmtc.pus_tc.tc_packing import common_service_queue_user + + common_service_queue_user( + service=service, op_code=op_code, tc_queue=service_queue + ) + def __init__(self, json_cfg_path: str): super().__init__(json_cfg_path=json_cfg_path) def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: - return get_fsfw_service_op_code_dict() + return common_fsfw_service_op_code_dict() def assign_communication_interface( self, com_if_key: str @@ -34,14 +43,7 @@ class FsfwHookBase(TmTcHookBase): def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int): print("No custom mode operation implemented") - def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT): - from common_tmtc.pus_tc.tc_packing import pack_service_queue_user - - pack_service_queue_user( - service=service, op_code=op_code, tc_queue=service_queue - ) - - def get_object_ids(self) -> Dict[bytes, list]: + def get_object_ids(self) -> ObjectIdDictT: from common_tmtc.config.object_ids import get_object_ids return get_object_ids() diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 479f418..c3ea65f 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -7,7 +7,7 @@ from tmtccmd.config import ( from tmtccmd.config.globals import get_default_service_op_code_dict -def get_fsfw_service_op_code_dict() -> ServiceOpCodeDictT: +def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT: service_op_code_dict = get_default_service_op_code_dict() op_code = dict() add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test") @@ -20,7 +20,16 @@ def get_fsfw_service_op_code_dict() -> ServiceOpCodeDictT: add_service_op_code_entry( srv_op_code_dict=service_op_code_dict, name="200", - info="Mode MGMT", + info="PUS Service 200 Mode MGMT", + op_code_entry=op_code, + ) + + op_code = dict() + add_op_code_entry(op_code_dict=op_code, keys="test", info="TC Scheduling Test") + add_service_op_code_entry( + srv_op_code_dict=service_op_code_dict, + name="11", + info="PUS Service 11 TC Scheduling", op_code_entry=op_code, ) return service_op_code_dict diff --git a/pus_tc/pus_11_tc_sched.py b/pus_tc/pus_11_tc_sched.py new file mode 100644 index 0000000..27a7fb9 --- /dev/null +++ b/pus_tc/pus_11_tc_sched.py @@ -0,0 +1,143 @@ +import struct +import time + +from spacepackets.ecss.tc import PusTelecommand + +from tmtccmd.config import QueueCommands +from tmtccmd.pus.pus_17_test import pack_service_17_ping_command +from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId +from tmtccmd.tc.definitions import TcQueueT + + +class OpCodes: + INSERT = ["0", "insert-test"] + DELETE = ["1", "del-test"] + + +def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): + if op_code in OpCodes.INSERT: + tc_queue.appendleft( + (QueueCommands.PRINT, "Testing Time-Tagged Command insertion") + ) + current_time = int(round(time.time())) + + # these TC[17,1] (ping commands) shall be inserted + ping_tcs = [ + pack_service_17_ping_command(1701), + pack_service_17_ping_command(1702), + pack_service_17_ping_command(1703), + pack_service_17_ping_command(1704), + pack_service_17_ping_command(1705), + ] + for idx, tc in enumerate(ping_tcs): + release_time = current_time + 20 + idx * 10 + tc = PusTelecommand( + service=11, + subservice=Subservices.TC_INSERT, + app_data=pack_insert_tc_app_data(release_time, tc), + ) + tc_queue.appendleft(tc.pack_command_tuple()) + if op_code in ["0", "del-test"]: + tc_queue.appendleft( + (QueueCommands.PRINT, "Testing Time-Tagged Command deletion") + ) + current_time = int(round(time.time())) + tc_to_schedule = pack_service_17_ping_command(1703) + time_tagged_tc = PusTelecommand( + service=11, + subservice=Subservices.TC_INSERT, + app_data=pack_insert_tc_app_data(current_time + 20, tc_to_schedule), + ) + tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) + del_time_tagged_tcs = PusTelecommand( + service=11, + subservice=Subservices.TC_DELETE, + app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule), + ssc=1105, + ) + + # a TC[11,5] for 3rd inserted ping TC + tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple()) + # TODO: This should be an independent test + # a TC[11,6] for some other previously inserted TCs + # service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG, + # current_time+45, current_time+55) + # tc_queue.appendleft(service_11_6_tc.pack_command_tuple()) + + # TODO: This should be an independent test + # a TC[11,7] for another previously inserted TC + # service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4) + # tc_queue.appendleft(service_11_7_tc.pack_command_tuple()) + + # TODO: This should be an independent test + # a TC[11,8] with offset time of 20s + # service_11_8_tc = build_filter_timeshift_tc( + # 20, + # TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG, + # current_time + 45, + # current_time + 55, + # ) + # tc_queue.appendleft((service_11_8_tc.pack_command_tuple())) + + +# this function packs another TC into an insert activity TC[11,4] +# parameter: release_time: Absolute time when TC shall be released/run +# parameter tc_to_insert: The TC which shall be inserted +def pack_insert_tc_app_data(release_time: int, tc_to_insert: PusTelecommand) -> bytes: + app_data = bytearray() + # pack the release time + app_data.extend(struct.pack("!I", release_time)) + # followed by the tc + app_data.extend(tc_to_insert.pack()) + return app_data + + +def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes: + return TcSchedReqId.build_from_tc(tc).pack() + + +def build_filter_delete_tc( + time_window_type: TypeOfTimeWindow, *timestamps: int +) -> PusTelecommand: + app_data = bytearray() + app_data.extend(struct.pack("!I", int(time_window_type))) + + if time_window_type != TypeOfTimeWindow.SELECT_ALL: + for timestamp in timestamps: + app_data.extend(struct.pack("!I", timestamp)) + + return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161) + + +def pack_corresponding_timeshift_app_data( + time_delta: bytes, tc: PusTelecommand, ssc: int +) -> bytes: + req_id = TcSchedReqId.build_from_tc(tc) + app_data = bytearray() + app_data.extend(time_delta) + app_data.extend(req_id.pack()) + return app_data + + +def build_filter_timeshift_tc( + time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int +) -> PusTelecommand: + app_data = bytearray() + app_data.extend(struct.pack("!I", time_offset)) + app_data.extend(struct.pack("!I", int(time_window_type))) + + if time_window_type != TypeOfTimeWindow.SELECT_ALL: + for timestamp in timestamps: + app_data.extend(struct.pack("!I", timestamp)) + + return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181) + + +# waits for a specified amount of seconds and prints ". . ." for each second +def wait_seconds(t: int): + print("Waiting: ", end="") + for x in range(t): + time.sleep(1) + print(". ", end="") + + print("") diff --git a/pus_tc/service_17_test.py b/pus_tc/pus_17_test.py similarity index 100% rename from pus_tc/service_17_test.py rename to pus_tc/pus_17_test.py diff --git a/pus_tc/service_200_mode.py b/pus_tc/pus_200_mode.py similarity index 100% rename from pus_tc/service_200_mode.py rename to pus_tc/pus_200_mode.py diff --git a/pus_tc/tc_packing.py b/pus_tc/tc_packing.py index 25daebb..5ff2cb4 100644 --- a/pus_tc/tc_packing.py +++ b/pus_tc/tc_packing.py @@ -9,6 +9,8 @@ from collections import deque from typing import Union from spacepackets.ecss.tc import PusTelecommand + +from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.logging import get_console_logger, get_current_time_string from tmtccmd.logging.pus import log_raw_pus_tc @@ -20,9 +22,9 @@ from tmtccmd.pus.pus_17_test import pack_generic_service17_test from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into -from common_tmtc.pus_tc.service_17_test import pack_service_17_commands +from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into -from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into +from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into LOGGER = get_console_logger() @@ -47,7 +49,9 @@ def pre_tc_send_cb( file_logger.info(queue_info) -def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: TcQueueT): +def common_service_queue_user( + service: Union[str, int], op_code: str, tc_queue: TcQueueT +): if service == CoreServiceList.SERVICE_2.value: return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue) if service == CoreServiceList.SERVICE_3.value: @@ -56,6 +60,8 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: Tc return pack_generic_service5_test_into(tc_queue=tc_queue) if service == CoreServiceList.SERVICE_8.value: return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue) + if service == CoreServiceList.SERVICE_11.value: + return pack_service_11_commands(op_code=op_code, tc_queue=tc_queue) if service == CoreServiceList.SERVICE_17.value: return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0) if service == CoreServiceList.SERVICE_20.value: diff --git a/tmtcc.py b/tmtcc.py index 72b3529..b59de44 100644 --- a/tmtcc.py +++ b/tmtcc.py @@ -33,20 +33,19 @@ except ImportError as error: from common_tmtc.config import __version__ from common_tmtc.config.definitions import PUS_APID -from common_tmtc.config.hook_implementation import FsfwHookBase +from common_tmtc.config.hook_implementation import CommonFsfwHookBase from common_tmtc.pus_tm.factory_hook import ccsds_tm_handler from common_tmtc.pus_tc.tc_packing import pre_tc_send_cb -def tmtcc_pre_args() -> FsfwHookBase: +def tmtcc_pre_args(): print(f"-- eive tmtc v{__version__} --") print(f"-- spacepackets v{spacepackets.__version__} --") tmtccmd.init_printout(False) - return FsfwHookBase(json_cfg_path=default_json_path()) def tmtcc_post_args( - hook_obj: FsfwHookBase, use_gui: bool, args: Optional[argparse.Namespace] + hook_obj: CommonFsfwHookBase, use_gui: bool, args: Optional[argparse.Namespace] ): setup_args = SetupArgs( hook_obj=hook_obj, use_gui=use_gui, apid=PUS_APID, cli_args=args