from spacepackets.ecss import PusTelecommand, PusService from tmtccmd.config import CoreServiceList from tmtccmd.config.tmtc import ( tmtc_definitions_provider, TmtcDefinitionWrapper, OpCodeEntry, ) from tmtccmd.pus.s17_test import pack_service_17_ping_command from tmtccmd.tc import service_provider from tmtccmd.tc.decorator import ServiceProviderParams class OpCodes: PING = "ping" TRIGGER_EVENT = "trig_event" PING_WITH_DATA = "ping_with_data" class Info: PING = "Simple Ping and Connection Test" TRIGGER_EVENT = "Trigger an event" PING_WITH_DATA = "Ping with data. Size of sent data is sent back" @tmtc_definitions_provider def add_test_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCodes.PING, info=Info.PING) oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT) oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA) defs.add_service( name=CoreServiceList.SERVICE_17_ALT, info="PUS 17 Test Service", op_code_entry=oce, ) @service_provider(CoreServiceList.SERVICE_17_ALT) def pack_test_command(p: ServiceProviderParams): info = p.info q = p.queue_helper if info.op_code == OpCodes.PING: q.add_log_cmd("Sending PUS TC [17,1]") q.add_pus_tc(pack_service_17_ping_command()) if info.op_code == OpCodes.TRIGGER_EVENT: q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128)) if info.op_code == OpCodes.PING_WITH_DATA: q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") while True: data_size = int(input("Please specify data size [0-1024]: ")) if data_size < 0 or data_size > 1024: print("Invalid data size") break dummy_data = bytearray() next_byte = True for i in range(data_size): dummy_data.append(int(next_byte)) next_byte = not next_byte q.add_pus_tc( PusTelecommand( service=PusService.S17_TEST, subservice=130, app_data=dummy_data ) )