import struct import time from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config import ( QueueCommands, ServiceOpCodeDictT, add_op_code_entry, generate_op_code_options, add_service_op_code_entry, ) from tmtccmd.pus.pus_17_test import pack_service_17_ping_command from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId from tmtccmd.tc.pus_11_tc_sched import ( generate_enable_tc_sched_cmd, generate_disable_tc_sched_cmd, generate_reset_tc_sched_cmd, pack_time_tagged_tc_app_data, ) from tmtccmd.tc.definitions import TcQueueT class OpCodes: ENABLE = ["0", "enable"] DISABLE = ["1", "disable"] RESET = ["2", "reset"] TEST_INSERT = ["12", "test-insert"] TEST_DELETE = ["13", "test-del"] TEST_RESET = ["14", "test-clear"] class Info: ENABLE = "Enable TC scheduling" DISABLE = "Disable TC scheduling" RESET = "Reset TC scheduling" TEST_INSERT = "Test TC scheduling insertion" TEST_DELETE = "Test TC scheduling deletion" TEST_RESET = "Test TC scheduling reset command" def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT): op_code = dict() add_op_code_entry( op_code_dict=op_code, keys=OpCodes.ENABLE, info=Info.ENABLE, options=generate_op_code_options(custom_timeout=2.0), ) add_op_code_entry( op_code_dict=op_code, keys=OpCodes.DISABLE, info=Info.DISABLE, options=generate_op_code_options(custom_timeout=2.0), ) add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET) add_op_code_entry( op_code_dict=op_code, keys=OpCodes.TEST_INSERT, info=Info.TEST_INSERT, options=generate_op_code_options(custom_timeout=0.2), ) add_op_code_entry( op_code_dict=op_code, keys=OpCodes.TEST_DELETE, info=Info.TEST_DELETE, options=generate_op_code_options(custom_timeout=0.2), ) add_op_code_entry( op_code_dict=op_code, keys=OpCodes.TEST_RESET, info=Info.TEST_RESET, options=generate_op_code_options(custom_timeout=0.2), ) add_service_op_code_entry( srv_op_code_dict=cmd_dict, name="11", info="PUS Service 11 TC Scheduling", op_code_entry=op_code, ) def __generic_pack_three_time_tagged_cmds(tc_queue: TcQueueT): tc_queue.appendleft((QueueCommands.PRINT, "Testing Time-Tagged Command insertion")) tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) current_time = int(round(time.time())) # these TC[17,1] (ping commands) shall be inserted ping_tcs = [ pack_service_17_ping_command(1701), pack_service_17_ping_command(1702), pack_service_17_ping_command(1703), ] for idx, tc in enumerate(ping_tcs): release_time = current_time + (idx + 2) * 5 time_tagged_tc = PusTelecommand( service=11, subservice=Subservices.TC_INSERT, app_data=pack_time_tagged_tc_app_data(struct.pack("!I", release_time), tc), ) tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) tc_queue.appendleft((QueueCommands.WAIT, 25)) def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): if op_code in OpCodes.TEST_INSERT: tc_queue.appendleft( (QueueCommands.PRINT, "Testing Time-Tagged Command deletion") ) __generic_pack_three_time_tagged_cmds(tc_queue=tc_queue) if op_code in OpCodes.TEST_DELETE: current_time = int(round(time.time())) tc_to_schedule = pack_service_17_ping_command(1703) time_tagged_tc = PusTelecommand( service=11, subservice=Subservices.TC_INSERT, app_data=pack_time_tagged_tc_app_data( struct.pack("!I", current_time + 20), tc_to_schedule ), ) tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) del_time_tagged_tcs = PusTelecommand( service=11, subservice=Subservices.TC_DELETE, app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule), ssc=1105, ) tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple()) if op_code in OpCodes.ENABLE: tc_queue.appendleft((QueueCommands.PRINT, "Enabling TC scheduler")) tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) if op_code in OpCodes.DISABLE: tc_queue.appendleft((QueueCommands.PRINT, "Disabling TC scheduler")) tc_queue.appendleft(generate_disable_tc_sched_cmd(ssc=0).pack_command_tuple()) if op_code in OpCodes.RESET: tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler")) tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) if op_code in OpCodes.TEST_RESET: tc_queue.appendleft((QueueCommands.PRINT, "Testing Reset command")) __generic_pack_three_time_tagged_cmds(tc_queue=tc_queue) tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) # a TC[11,5] for 3rd inserted ping TC # TODO: This should be an independent test # a TC[11,6] for some other previously inserted TCs # service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG, # current_time+45, current_time+55) # tc_queue.appendleft(service_11_6_tc.pack_command_tuple()) # TODO: This should be an independent test # a TC[11,7] for another previously inserted TC # service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4) # tc_queue.appendleft(service_11_7_tc.pack_command_tuple()) # TODO: This should be an independent test # a TC[11,8] with offset time of 20s # service_11_8_tc = build_filter_timeshift_tc( # 20, # TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG, # current_time + 45, # current_time + 55, # ) # tc_queue.appendleft((service_11_8_tc.pack_command_tuple())) def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes: return TcSchedReqId.build_from_tc(tc).pack() def build_filter_delete_tc( time_window_type: TypeOfTimeWindow, *timestamps: int ) -> PusTelecommand: app_data = bytearray() app_data.extend(struct.pack("!I", int(time_window_type))) if time_window_type != TypeOfTimeWindow.SELECT_ALL: for timestamp in timestamps: app_data.extend(struct.pack("!I", timestamp)) return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161) def pack_corresponding_timeshift_app_data( time_delta: bytes, tc: PusTelecommand, ssc: int ) -> bytes: req_id = TcSchedReqId.build_from_tc(tc) app_data = bytearray() app_data.extend(time_delta) app_data.extend(req_id.pack()) return app_data def build_filter_timeshift_tc( time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int ) -> PusTelecommand: app_data = bytearray() app_data.extend(struct.pack("!I", time_offset)) app_data.extend(struct.pack("!I", int(time_window_type))) if time_window_type != TypeOfTimeWindow.SELECT_ALL: for timestamp in timestamps: app_data.extend(struct.pack("!I", timestamp)) return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181) # waits for a specified amount of seconds and prints ". . ." for each second def wait_seconds(t: int): print("Waiting: ", end="") for x in range(t): time.sleep(1) print(". ", end="") print("")