new test service commands
This commit is contained in:
parent
074eb82e78
commit
331b67622a
2
deps/tmtccmd
vendored
2
deps/tmtccmd
vendored
@ -1 +1 @@
|
||||
Subproject commit f3c9501891b582f5b4c61aec5eb939d05a2e24d2
|
||||
Subproject commit 9f16ae401e7bc4f1125d04ac9ff6584213021691
|
@ -84,11 +84,6 @@ def handle_default_procedure(
|
||||
obj_id_man = get_object_ids()
|
||||
if service == CoreServiceList.SERVICE_5.value:
|
||||
return pack_generic_service_5_test_into(q=queue_helper)
|
||||
if (
|
||||
service == CoreServiceList.SERVICE_17
|
||||
or service == CoreServiceList.SERVICE_17_ALT
|
||||
):
|
||||
return queue_helper.add_pus_tc(pack_service_17_ping_command())
|
||||
if service == CoreServiceList.SERVICE_200.value:
|
||||
return pack_service_200_test_into(q=queue_helper)
|
||||
if service == CustomServiceList.P60DOCK.value:
|
||||
@ -105,8 +100,6 @@ def handle_default_procedure(
|
||||
if service == CustomServiceList.ACU.value:
|
||||
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
|
||||
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
|
||||
# if service == CustomServiceList.BPX_BATTERY.value:
|
||||
# return pack_bpx_commands(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.TCS.value:
|
||||
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.TMP1075_1.value:
|
||||
|
59
tmtc/test.py
Normal file
59
tmtc/test.py
Normal file
@ -0,0 +1,59 @@
|
||||
from spacepackets.ecss import PusTelecommand, PusServices
|
||||
from tmtccmd.config import CoreServiceList
|
||||
from tmtccmd.config.tmtc import (
|
||||
tmtc_definitions_provider,
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
|
||||
from tmtccmd.tc import service_provider
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
|
||||
|
||||
class OpCodes:
|
||||
PING = "ping"
|
||||
TRIGGER_EVENT = "trig_event"
|
||||
PING_WITH_DATA = "ping_with_data"
|
||||
|
||||
|
||||
class Info:
|
||||
PING = "Simple Ping and Connection Test"
|
||||
TRIGGER_EVENT = "Trigger an event"
|
||||
PING_WITH_DATA = "Ping with data. Size of sent data is sent back"
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def pack_test_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCodes.PING, info=Info.PING)
|
||||
oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT)
|
||||
oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA)
|
||||
|
||||
defs.add_service(
|
||||
name=CoreServiceList.SERVICE_17, info="PUS 17 Test Service", op_code_entry=oce
|
||||
)
|
||||
|
||||
|
||||
@service_provider(CoreServiceList.SERVICE_17)
|
||||
def pack_test_command(p: ServiceProviderParams):
|
||||
info = p.info
|
||||
q = p.queue_helper
|
||||
if info.op_code == OpCodes.PING:
|
||||
q.add_log_cmd("Sending PUS TC [17,1]")
|
||||
q.add_pus_tc(pack_service_17_ping_command())
|
||||
if info.op_code == OpCodes.TRIGGER_EVENT:
|
||||
q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]")
|
||||
q.add_pus_tc(PusTelecommand(service=PusServices.S17_TEST, subservice=128))
|
||||
if info.op_code == OpCodes.PING_WITH_DATA:
|
||||
q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]")
|
||||
while True:
|
||||
data_size = int(input("Please specify data size [0-1024]: "))
|
||||
if data_size < 0 or data_size > 1024:
|
||||
print("Invalid data size")
|
||||
break
|
||||
dummy_data = bytes(data_size)
|
||||
q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=PusServices.S17_TEST, subservice=130, app_data=dummy_data
|
||||
)
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user