From 3cca54f66fe72f292787499e1e9bf2497a2cfdbf Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 24 May 2022 14:50:37 +0200 Subject: [PATCH] moved some helpers into framework --- pus_tc/pus_11_tc_sched.py | 92 ++++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/pus_tc/pus_11_tc_sched.py b/pus_tc/pus_11_tc_sched.py index 721e904..841d55d 100644 --- a/pus_tc/pus_11_tc_sched.py +++ b/pus_tc/pus_11_tc_sched.py @@ -16,6 +16,7 @@ from tmtccmd.tc.pus_11_tc_sched import ( generate_enable_tc_sched_cmd, generate_disable_tc_sched_cmd, generate_reset_tc_sched_cmd, + pack_time_tagged_tc_app_data, ) from tmtccmd.tc.definitions import TcQueueT @@ -25,16 +26,18 @@ class OpCodes: DISABLE = ["1", "disable"] RESET = ["2", "reset"] - INSERT = ["12", "test-insert"] - DELETE = ["13", "del-test"] + TEST_INSERT = ["12", "test-insert"] + TEST_DELETE = ["13", "test-del"] + TEST_RESET = ["14", "test-clear"] class Info: ENABLE = "Enable TC scheduling" DISABLE = "Disable TC scheduling" RESET = "Reset TC scheduling" - INSERT = "Test TC scheduling insertion" - DELETE = "Test TC scheduling deletion" + TEST_INSERT = "Test TC scheduling insertion" + TEST_DELETE = "Test TC scheduling deletion" + TEST_RESET = "Test TC scheduling reset command" def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT): @@ -54,17 +57,22 @@ def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT): add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET) add_op_code_entry( op_code_dict=op_code, - keys=OpCodes.INSERT, - info=Info.INSERT, + keys=OpCodes.TEST_INSERT, + info=Info.TEST_INSERT, options=generate_op_code_options(custom_timeout=0.2), ) add_op_code_entry( op_code_dict=op_code, - keys=OpCodes.DELETE, - info=Info.DELETE, + keys=OpCodes.TEST_DELETE, + info=Info.TEST_DELETE, + options=generate_op_code_options(custom_timeout=0.2), + ) + add_op_code_entry( + op_code_dict=op_code, + keys=OpCodes.TEST_RESET, + info=Info.TEST_RESET, options=generate_op_code_options(custom_timeout=0.2), ) - add_service_op_code_entry( srv_op_code_dict=cmd_dict, name="11", @@ -73,39 +81,41 @@ def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT): ) -def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): - if op_code in OpCodes.INSERT: - tc_queue.appendleft( - (QueueCommands.PRINT, "Testing Time-Tagged Command insertion") - ) - tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) - current_time = int(round(time.time())) +def __generic_pack_three_time_tagged_cmds(tc_queue: TcQueueT): + tc_queue.appendleft((QueueCommands.PRINT, "Testing Time-Tagged Command insertion")) + tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) + current_time = int(round(time.time())) - # these TC[17,1] (ping commands) shall be inserted - ping_tcs = [ - pack_service_17_ping_command(1701), - pack_service_17_ping_command(1702), - pack_service_17_ping_command(1703), - ] - for idx, tc in enumerate(ping_tcs): - release_time = current_time + (idx + 2) * 5 - time_tagged_tc = PusTelecommand( - service=11, - subservice=Subservices.TC_INSERT, - app_data=pack_insert_tc_app_data(struct.pack("!I", release_time), tc), - ) - tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 25)) - if op_code in OpCodes.DELETE: + # these TC[17,1] (ping commands) shall be inserted + ping_tcs = [ + pack_service_17_ping_command(1701), + pack_service_17_ping_command(1702), + pack_service_17_ping_command(1703), + ] + for idx, tc in enumerate(ping_tcs): + release_time = current_time + (idx + 2) * 5 + time_tagged_tc = PusTelecommand( + service=11, + subservice=Subservices.TC_INSERT, + app_data=pack_time_tagged_tc_app_data(struct.pack("!I", release_time), tc), + ) + tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) + tc_queue.appendleft((QueueCommands.WAIT, 25)) + + +def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): + if op_code in OpCodes.TEST_INSERT: tc_queue.appendleft( (QueueCommands.PRINT, "Testing Time-Tagged Command deletion") ) + __generic_pack_three_time_tagged_cmds(tc_queue=tc_queue) + if op_code in OpCodes.TEST_DELETE: current_time = int(round(time.time())) tc_to_schedule = pack_service_17_ping_command(1703) time_tagged_tc = PusTelecommand( service=11, subservice=Subservices.TC_INSERT, - app_data=pack_insert_tc_app_data( + app_data=pack_time_tagged_tc_app_data( struct.pack("!I", current_time + 20), tc_to_schedule ), ) @@ -126,6 +136,10 @@ def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): if op_code in OpCodes.RESET: tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler")) tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) + if op_code in OpCodes.TEST_RESET: + tc_queue.appendleft((QueueCommands.PRINT, "Testing Reset command")) + __generic_pack_three_time_tagged_cmds(tc_queue=tc_queue) + tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) # a TC[11,5] for 3rd inserted ping TC # TODO: This should be an independent test # a TC[11,6] for some other previously inserted TCs @@ -149,18 +163,6 @@ def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): # tc_queue.appendleft((service_11_8_tc.pack_command_tuple())) -# this function packs another TC into an insert activity TC[11,4] -# parameter: release_time: Absolute time when TC shall be released/run -# parameter tc_to_insert: The TC which shall be inserted -def pack_insert_tc_app_data(release_time: bytes, tc_to_insert: PusTelecommand) -> bytes: - app_data = bytearray() - # pack the release time - app_data.extend(release_time) - # followed by the tc - app_data.extend(tc_to_insert.pack()) - return app_data - - def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes: return TcSchedReqId.build_from_tc(tc).pack()