Compare commits

...

3 Commits

Author SHA1 Message Date
88302ef7eb use alternating 0 and 1 for dummy data 2022-11-02 20:02:00 +01:00
622b2e4f21 now it works 2022-11-02 19:52:55 +01:00
331b67622a new test service commands 2022-11-02 19:49:07 +01:00
5 changed files with 67 additions and 15 deletions

2
deps/tmtccmd vendored

View File

@@ -21,13 +21,6 @@ def get_eive_service_op_code_dict() -> TmtcDefinitionWrapper:
info="PUS Service 5 Event",
op_code_entry=srv_5,
)
srv_17 = OpCodeEntry()
srv_17.add("0", "Ping Test")
def_wrapper.add_service(
name=CoreServiceList.SERVICE_17_ALT,
info="PUS Service 17 Test",
op_code_entry=srv_17,
)
call_all_definitions_providers(def_wrapper)
return def_wrapper

View File

@@ -84,11 +84,6 @@ def handle_default_procedure(
obj_id_man = get_object_ids()
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service_5_test_into(q=queue_helper)
if (
service == CoreServiceList.SERVICE_17
or service == CoreServiceList.SERVICE_17_ALT
):
return queue_helper.add_pus_tc(pack_service_17_ping_command())
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_test_into(q=queue_helper)
if service == CustomServiceList.P60DOCK.value:
@@ -105,8 +100,6 @@ def handle_default_procedure(
if service == CustomServiceList.ACU.value:
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
# if service == CustomServiceList.BPX_BATTERY.value:
# return pack_bpx_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS.value:
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TMP1075_1.value:

View File

@@ -1 +1,2 @@
from .acs_subsystem import add_acs_subsystem_cmds
from .test import add_test_defs

65
tmtc/test.py Normal file
View File

@@ -0,0 +1,65 @@
from spacepackets.ecss import PusTelecommand, PusServices
from tmtccmd.config import CoreServiceList
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from tmtccmd.tc import service_provider
from tmtccmd.tc.decorator import ServiceProviderParams
class OpCodes:
PING = "ping"
TRIGGER_EVENT = "trig_event"
PING_WITH_DATA = "ping_with_data"
class Info:
PING = "Simple Ping and Connection Test"
TRIGGER_EVENT = "Trigger an event"
PING_WITH_DATA = "Ping with data. Size of sent data is sent back"
@tmtc_definitions_provider
def add_test_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.PING, info=Info.PING)
oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT)
oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA)
defs.add_service(
name=CoreServiceList.SERVICE_17_ALT,
info="PUS 17 Test Service",
op_code_entry=oce,
)
@service_provider(CoreServiceList.SERVICE_17_ALT)
def pack_test_command(p: ServiceProviderParams):
info = p.info
q = p.queue_helper
if info.op_code == OpCodes.PING:
q.add_log_cmd("Sending PUS TC [17,1]")
q.add_pus_tc(pack_service_17_ping_command())
if info.op_code == OpCodes.TRIGGER_EVENT:
q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]")
q.add_pus_tc(PusTelecommand(service=PusServices.S17_TEST, subservice=128))
if info.op_code == OpCodes.PING_WITH_DATA:
q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]")
while True:
data_size = int(input("Please specify data size [0-1024]: "))
if data_size < 0 or data_size > 1024:
print("Invalid data size")
break
dummy_data = bytearray()
next_byte = True
for i in range(data_size):
dummy_data.append(int(next_byte))
next_byte = not next_byte
q.add_pus_tc(
PusTelecommand(
service=PusServices.S17_TEST, subservice=130, app_data=dummy_data
)
)