import datetime import math import struct from spacepackets.ecss import PusService, PusTelecommand from tmtccmd.config import CmdTreeNode from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.s17_test import create_service_17_ping_command from tmtccmd.tmtc import DefaultPusQueueHelper class OpCode: PING = "ping" TRIGGER_EVENT = "trig_event" PING_WITH_DATA = "ping_with_data" SCHEDULE_PING = "sched_ping" class Info: PING = "Simple Ping and Connection Test" TRIGGER_EVENT = "Trigger an event" PING_WITH_DATA = "Ping with data. Size of sent data is sent back" SCHEDULE_PING = "Schedule a ping" def create_test_node() -> CmdTreeNode: node = CmdTreeNode("test", "Test Commands") node.add_child(CmdTreeNode(OpCode.PING, Info.PING)) node.add_child(CmdTreeNode(OpCode.TRIGGER_EVENT, Info.TRIGGER_EVENT)) node.add_child(CmdTreeNode(OpCode.PING_WITH_DATA, Info.PING_WITH_DATA)) node.add_child(CmdTreeNode(OpCode.SCHEDULE_PING, Info.SCHEDULE_PING)) return node def build_test_commands(q: DefaultPusQueueHelper, cmd_path: str): if cmd_path == OpCode.PING: q.add_log_cmd("Sending PUS TC [17,1]") q.add_pus_tc(create_service_17_ping_command()) if cmd_path == OpCode.TRIGGER_EVENT: q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128)) if cmd_path == OpCode.SCHEDULE_PING: q.add_log_cmd("Sending scheduled PUS ping") # Generate a UNIX timestamp 30 seconds in the future using the datetime API with a UTC timezone now = datetime.datetime.now(tz=datetime.timezone.utc) second_offset_to_now = input("Please specify offset to now in seconds: ") now += datetime.timedelta(seconds=int(second_offset_to_now)) unix_stamp = struct.pack("!I", math.floor(now.timestamp())) print(f"Sending ping scheuled at {now}") ping = PusTelecommand(service=PusService.S17_TEST, subservice=128) q.add_pus_tc( create_time_tagged_cmd( release_time=unix_stamp, tc_to_insert=ping, ) ) if cmd_path == OpCode.PING_WITH_DATA: q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") while True: data_size = int(input("Please specify data size [0-1024]: ")) if data_size < 0 or data_size > 1024: print("Invalid data size") break dummy_data = bytearray() next_byte = True for _ in range(data_size): dummy_data.append(int(next_byte)) next_byte = not next_byte q.add_pus_tc( PusTelecommand( service=PusService.S17_TEST, subservice=130, app_data=dummy_data ) )