From cdbd40363d82754414215b1d6e5b46fcf1e28e99 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 24 May 2022 14:27:29 +0200 Subject: [PATCH] added more tc sched commands --- pus_tc/cmd_definitions.py | 18 ++------- pus_tc/pus_11_tc_sched.py | 82 +++++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 22 deletions(-) diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index 6f918bc..057b71d 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -7,7 +7,7 @@ from tmtccmd.config import ( ) from tmtccmd.config.definitions import OpCodeOptionsT from tmtccmd.config.globals import get_default_service_op_code_dict -from common_tmtc.pus_tc.pus_11_tc_sched import OpCodes +from common_tmtc.pus_tc.pus_11_tc_sched import OpCodes, add_tc_sched_cmds def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT: @@ -27,18 +27,6 @@ def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT: op_code_entry=op_code, ) - op_code = dict() - add_op_code_entry( - op_code_dict=op_code, - keys=OpCodes.INSERT, - info="Insertion Test", - options=generate_op_code_options(custom_timeout=0.5), - ) - add_op_code_entry(op_code_dict=op_code, keys=OpCodes.DELETE, info="Deletion Test") - add_service_op_code_entry( - srv_op_code_dict=service_op_code_dict, - name="11", - info="PUS Service 11 TC Scheduling", - op_code_entry=op_code, - ) + add_tc_sched_cmds(service_op_code_dict) + return service_op_code_dict diff --git a/pus_tc/pus_11_tc_sched.py b/pus_tc/pus_11_tc_sched.py index 6286128..721e904 100644 --- a/pus_tc/pus_11_tc_sched.py +++ b/pus_tc/pus_11_tc_sched.py @@ -3,15 +3,74 @@ import time from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.config import QueueCommands +from tmtccmd.config import ( + QueueCommands, + ServiceOpCodeDictT, + add_op_code_entry, + generate_op_code_options, + add_service_op_code_entry, +) from tmtccmd.pus.pus_17_test import pack_service_17_ping_command from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId +from tmtccmd.tc.pus_11_tc_sched import ( + generate_enable_tc_sched_cmd, + generate_disable_tc_sched_cmd, + generate_reset_tc_sched_cmd, +) from tmtccmd.tc.definitions import TcQueueT class OpCodes: - INSERT = ["0", "insert-test"] - DELETE = ["1", "del-test"] + ENABLE = ["0", "enable"] + DISABLE = ["1", "disable"] + RESET = ["2", "reset"] + + INSERT = ["12", "test-insert"] + DELETE = ["13", "del-test"] + + +class Info: + ENABLE = "Enable TC scheduling" + DISABLE = "Disable TC scheduling" + RESET = "Reset TC scheduling" + INSERT = "Test TC scheduling insertion" + DELETE = "Test TC scheduling deletion" + + +def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT): + op_code = dict() + add_op_code_entry( + op_code_dict=op_code, + keys=OpCodes.ENABLE, + info=Info.ENABLE, + options=generate_op_code_options(custom_timeout=2.0), + ) + add_op_code_entry( + op_code_dict=op_code, + keys=OpCodes.DISABLE, + info=Info.DISABLE, + options=generate_op_code_options(custom_timeout=2.0), + ) + add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET) + add_op_code_entry( + op_code_dict=op_code, + keys=OpCodes.INSERT, + info=Info.INSERT, + options=generate_op_code_options(custom_timeout=0.2), + ) + add_op_code_entry( + op_code_dict=op_code, + keys=OpCodes.DELETE, + info=Info.DELETE, + options=generate_op_code_options(custom_timeout=0.2), + ) + + add_service_op_code_entry( + srv_op_code_dict=cmd_dict, + name="11", + info="PUS Service 11 TC Scheduling", + op_code_entry=op_code, + ) def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): @@ -19,6 +78,7 @@ def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): tc_queue.appendleft( (QueueCommands.PRINT, "Testing Time-Tagged Command insertion") ) + tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) current_time = int(round(time.time())) # these TC[17,1] (ping commands) shall be inserted @@ -28,7 +88,7 @@ def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): pack_service_17_ping_command(1703), ] for idx, tc in enumerate(ping_tcs): - release_time = current_time + (idx + 1) * 5 + release_time = current_time + (idx + 2) * 5 time_tagged_tc = PusTelecommand( service=11, subservice=Subservices.TC_INSERT, @@ -36,7 +96,7 @@ def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): ) tc_queue.appendleft(time_tagged_tc.pack_command_tuple()) tc_queue.appendleft((QueueCommands.WAIT, 25)) - if op_code in ["1", "del-test"]: + if op_code in OpCodes.DELETE: tc_queue.appendleft( (QueueCommands.PRINT, "Testing Time-Tagged Command deletion") ) @@ -56,9 +116,17 @@ def pack_service_11_commands(op_code: str, tc_queue: TcQueueT): app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule), ssc=1105, ) - - # a TC[11,5] for 3rd inserted ping TC tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple()) + if op_code in OpCodes.ENABLE: + tc_queue.appendleft((QueueCommands.PRINT, "Enabling TC scheduler")) + tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple()) + if op_code in OpCodes.DISABLE: + tc_queue.appendleft((QueueCommands.PRINT, "Disabling TC scheduler")) + tc_queue.appendleft(generate_disable_tc_sched_cmd(ssc=0).pack_command_tuple()) + if op_code in OpCodes.RESET: + tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler")) + tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple()) + # a TC[11,5] for 3rd inserted ping TC # TODO: This should be an independent test # a TC[11,6] for some other previously inserted TCs # service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,