import struct import time from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config import QueueCommands from tmtccmd.pus.pus_17_test import pack_service_17_ping_command from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId from tmtccmd.tc.definitions import TcQueueT class OpCodes: INSERT = ["0", "insert-test"] DELETE = ["1", "del-test"] def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): if op_code in OpCodes.INSERT: tc_queue.appendleft( (QueueCommands.PRINT, "Testing Time-Tagged Command insertion") ) current_time = int(round(time.time())) # these TC[17,1] (ping commands) shall be inserted ping_tcs = [ pack_service_17_ping_command(1701), pack_service_17_ping_command(1702), pack_service_17_ping_command(1703), ] for idx, tc in enumerate(ping_tcs): release_time = current_time + (idx + 1) * 5 time_tagged_tc = PusTelecommand( service=11, subservice=Subservices.TC_INSERT, app_data=pack_insert_tc_app_data(struct.pack("!I", release_time), tc), ) tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) tc_queue.appendleft((QueueCommands.WAIT, 25)) if op_code in ["1", "del-test"]: tc_queue.appendleft( (QueueCommands.PRINT, "Testing Time-Tagged Command deletion") ) current_time = int(round(time.time())) tc_to_schedule = pack_service_17_ping_command(1703) time_tagged_tc = PusTelecommand( service=11, subservice=Subservices.TC_INSERT, app_data=pack_insert_tc_app_data( struct.pack("!I", current_time + 20), tc_to_schedule ), ) tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) del_time_tagged_tcs = PusTelecommand( service=11, subservice=Subservices.TC_DELETE, app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule), ssc=1105, ) # a TC[11,5] for 3rd inserted ping TC tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple()) # TODO: This should be an independent test # a TC[11,6] for some other previously inserted TCs # service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG, # current_time+45, current_time+55) # tc_queue.appendleft(service_11_6_tc.pack_command_tuple()) # TODO: This should be an independent test # a TC[11,7] for another previously inserted TC # service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4) # tc_queue.appendleft(service_11_7_tc.pack_command_tuple()) # TODO: This should be an independent test # a TC[11,8] with offset time of 20s # service_11_8_tc = build_filter_timeshift_tc( # 20, # TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG, # current_time + 45, # current_time + 55, # ) # tc_queue.appendleft((service_11_8_tc.pack_command_tuple())) # this function packs another TC into an insert activity TC[11,4] # parameter: release_time: Absolute time when TC shall be released/run # parameter tc_to_insert: The TC which shall be inserted def pack_insert_tc_app_data(release_time: bytes, tc_to_insert: PusTelecommand) -> bytes: app_data = bytearray() # pack the release time app_data.extend(release_time) # followed by the tc app_data.extend(tc_to_insert.pack()) return app_data def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes: return TcSchedReqId.build_from_tc(tc).pack() def build_filter_delete_tc( time_window_type: TypeOfTimeWindow, *timestamps: int ) -> PusTelecommand: app_data = bytearray() app_data.extend(struct.pack("!I", int(time_window_type))) if time_window_type != TypeOfTimeWindow.SELECT_ALL: for timestamp in timestamps: app_data.extend(struct.pack("!I", timestamp)) return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161) def pack_corresponding_timeshift_app_data( time_delta: bytes, tc: PusTelecommand, ssc: int ) -> bytes: req_id = TcSchedReqId.build_from_tc(tc) app_data = bytearray() app_data.extend(time_delta) app_data.extend(req_id.pack()) return app_data def build_filter_timeshift_tc( time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int ) -> PusTelecommand: app_data = bytearray() app_data.extend(struct.pack("!I", time_offset)) app_data.extend(struct.pack("!I", int(time_window_type))) if time_window_type != TypeOfTimeWindow.SELECT_ALL: for timestamp in timestamps: app_data.extend(struct.pack("!I", timestamp)) return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181) # waits for a specified amount of seconds and prints ". . ." for each second def wait_seconds(t: int): print("Waiting: ", end="") for x in range(t): time.sleep(1) print(". ", end="") print("")