From 331b67622a71085c76a5c6e17f712a8ef7bc52c0 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 2 Nov 2022 19:49:07 +0100 Subject: [PATCH] new test service commands --- deps/tmtccmd | 2 +- pus_tc/procedure_packer.py | 7 ----- tmtc/test.py | 59 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 tmtc/test.py diff --git a/deps/tmtccmd b/deps/tmtccmd index f3c9501..9f16ae4 160000 --- a/deps/tmtccmd +++ b/deps/tmtccmd @@ -1 +1 @@ -Subproject commit f3c9501891b582f5b4c61aec5eb939d05a2e24d2 +Subproject commit 9f16ae401e7bc4f1125d04ac9ff6584213021691 diff --git a/pus_tc/procedure_packer.py b/pus_tc/procedure_packer.py index 7715c4e..e248e6b 100644 --- a/pus_tc/procedure_packer.py +++ b/pus_tc/procedure_packer.py @@ -84,11 +84,6 @@ def handle_default_procedure( obj_id_man = get_object_ids() if service == CoreServiceList.SERVICE_5.value: return pack_generic_service_5_test_into(q=queue_helper) - if ( - service == CoreServiceList.SERVICE_17 - or service == CoreServiceList.SERVICE_17_ALT - ): - return queue_helper.add_pus_tc(pack_service_17_ping_command()) if service == CoreServiceList.SERVICE_200.value: return pack_service_200_test_into(q=queue_helper) if service == CustomServiceList.P60DOCK.value: @@ -105,8 +100,6 @@ def handle_default_procedure( if service == CustomServiceList.ACU.value: object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID)) return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code) - # if service == CustomServiceList.BPX_BATTERY.value: - # return pack_bpx_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.TCS.value: return pack_tcs_sys_commands(q=queue_helper, op_code=op_code) if service == CustomServiceList.TMP1075_1.value: diff --git a/tmtc/test.py b/tmtc/test.py new file mode 100644 index 0000000..7e2778b --- /dev/null +++ b/tmtc/test.py @@ -0,0 +1,59 @@ +from spacepackets.ecss import PusTelecommand, PusServices +from tmtccmd.config import CoreServiceList +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + TmtcDefinitionWrapper, + OpCodeEntry, +) +from tmtccmd.pus.pus_17_test import pack_service_17_ping_command +from tmtccmd.tc import service_provider +from tmtccmd.tc.decorator import ServiceProviderParams + + +class OpCodes: + PING = "ping" + TRIGGER_EVENT = "trig_event" + PING_WITH_DATA = "ping_with_data" + + +class Info: + PING = "Simple Ping and Connection Test" + TRIGGER_EVENT = "Trigger an event" + PING_WITH_DATA = "Ping with data. Size of sent data is sent back" + + +@tmtc_definitions_provider +def pack_test_cmds(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodes.PING, info=Info.PING) + oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT) + oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA) + + defs.add_service( + name=CoreServiceList.SERVICE_17, info="PUS 17 Test Service", op_code_entry=oce + ) + + +@service_provider(CoreServiceList.SERVICE_17) +def pack_test_command(p: ServiceProviderParams): + info = p.info + q = p.queue_helper + if info.op_code == OpCodes.PING: + q.add_log_cmd("Sending PUS TC [17,1]") + q.add_pus_tc(pack_service_17_ping_command()) + if info.op_code == OpCodes.TRIGGER_EVENT: + q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") + q.add_pus_tc(PusTelecommand(service=PusServices.S17_TEST, subservice=128)) + if info.op_code == OpCodes.PING_WITH_DATA: + q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") + while True: + data_size = int(input("Please specify data size [0-1024]: ")) + if data_size < 0 or data_size > 1024: + print("Invalid data size") + break + dummy_data = bytes(data_size) + q.add_pus_tc( + PusTelecommand( + service=PusServices.S17_TEST, subservice=130, app_data=dummy_data + ) + )