#!/usr/bin/env python3 """Example client for the sat-rs example application""" import enum import logging import struct import sys import time from typing import Optional import datetime import json import pprint import tmtccmd from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm from spacepackets.ccsds.time import CdsShortTimestamp from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd from tmtccmd.core.base import BackendRequest from tmtccmd.pus import VerificationWrapper from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase from tmtccmd.com import ComInterface from tmtccmd.config import ( default_json_path, SetupParams, HookBase, TmtcDefinitionWrapper, CoreServiceList, OpCodeEntry, params_to_procedure_conversion, ) from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper from tmtccmd.logging import add_colorlog_console_logger from tmtccmd.logging.pus import ( RegularTmtcLogWrapper, RawTmtcTimedLogWrapper, TimedLogWhen, ) from tmtccmd.tc import ( TcQueueEntryType, ProcedureWrapper, TcProcedureType, FeedWrapper, SendCbParams, DefaultPusQueueHelper, QueueWrapper, ) from tmtccmd.tm.pus_5_fsfw_event import Service5Tm from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util.obj_id import ObjectIdDictT from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter _LOGGER = logging.getLogger(__name__) EXAMPLE_PUS_APID = 0x02 class SatRsConfigHook(HookBase): def __init__(self, json_cfg_path: str): super().__init__(json_cfg_path=json_cfg_path) def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import ( create_com_interface_default, create_com_interface_cfg_default, ) cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, space_packet_ids=None, ) return create_com_interface_default(cfg) def get_tmtc_definitions(self) -> TmtcDefinitionWrapper: from tmtccmd.config.globals import get_default_tmtc_defs defs = get_default_tmtc_defs() srv_5 = OpCodeEntry() srv_5.add("0", "Event Test") defs.add_service( name=CoreServiceList.SERVICE_5.value, info="PUS Service 5 Event", op_code_entry=srv_5, ) srv_17 = OpCodeEntry() srv_17.add("0", "Ping Test") defs.add_service( name=CoreServiceList.SERVICE_17_ALT, info="PUS Service 17 Test", op_code_entry=srv_17, ) srv_3 = OpCodeEntry() srv_3.add(HkOpCodes.GENERATE_ONE_SHOT, "Generate AOCS one shot HK") defs.add_service( name=CoreServiceList.SERVICE_3, info="PUS Service 3 Housekeeping", op_code_entry=srv_3, ) srv_8 = OpCodeEntry() srv_8.add("1", "Camera Image Request") defs.add_service( name=CoreServiceList.SERVICE_8, info="PUS Service 8 Action", op_code_entry=srv_8, ) srv_11 = OpCodeEntry() srv_11.add("0", "Scheduled TC Test") srv_11.add("1", "Scheduled Camera Request TC") defs.add_service( name=CoreServiceList.SERVICE_11, info="PUS Service 11 TC Scheduling", op_code_entry=srv_11, ) return defs def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): _LOGGER.info("Mode operation hook was called") pass def get_object_ids(self) -> ObjectIdDictT: from tmtccmd.config.objects import get_core_object_ids return get_core_object_ids() class PusHandler(SpecificApidHandlerBase): def __init__( self, verif_wrapper: VerificationWrapper, printer: FsfwTmTcPrinter, raw_logger: RawTmtcTimedLogWrapper, ): super().__init__(EXAMPLE_PUS_APID, None) self.printer = printer self.raw_logger = raw_logger self.verif_wrapper = verif_wrapper def handle_tm(self, packet: bytes, _user_args: any): try: tm_packet = PusTelemetry.unpack( packet, time_reader=CdsShortTimestamp.empty() ) except ValueError as e: _LOGGER.warning("Could not generate PUS TM object from raw data") _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") raise e service = tm_packet.service dedicated_handler = False if service == 1: tm_packet = Service1Tm.unpack( data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) ) res = self.verif_wrapper.add_tm(tm_packet) if res is None: _LOGGER.info( f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" ) _LOGGER.warning( f"No matching telecommand found for {tm_packet.tc_req_id}" ) else: self.verif_wrapper.log_to_console(tm_packet, res) self.verif_wrapper.log_to_file(tm_packet, res) dedicated_handler = True if service == 3: # _LOGGER.info("No handling for HK packets implemented") # _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) if pus_tm.subservice == 25: if len(pus_tm.source_data) < 8: raise ValueError("No addressable ID in HK packet") json_str = str(pus_tm.source_data[8:].decode('utf8')) json_object = json.loads(json_str) pprint.pprint(json_object) dedicated_handler = True if service == 5: tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty()) if service == 17: tm_packet = Service17Tm.unpack( packet, time_reader=CdsShortTimestamp.empty() ) dedicated_handler = True if tm_packet.subservice == 2: self.printer.file_logger.info("Received Ping Reply TM[17,2]") _LOGGER.info("Received Ping Reply TM[17,2]") else: self.printer.file_logger.info( f"Received Test Packet with unknown subservice {tm_packet.subservice}" ) _LOGGER.info( f"Received Test Packet with unknown subservice {tm_packet.subservice}" ) if tm_packet is None: _LOGGER.info( f"The service {service} is not implemented in Telemetry Factory" ) tm_packet = PusTelemetry.unpack( packet, time_reader=CdsShortTimestamp.empty() ) self.raw_logger.log_tm(tm_packet) if not dedicated_handler and tm_packet is not None: pass # self.printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) def make_addressable_id(target_id: int, unique_id: int) -> bytes: byte_string = bytearray(struct.pack("!I", target_id)) byte_string.extend(struct.pack("!I", unique_id)) return byte_string def read_addressable_id(data: bytes) -> tuple[int, int]: target_id = struct.unpack("!I", data[0:4])[0] set_id = struct.unpack("!I", data[4:8])[0] return (target_id, set_id) class CustomServiceList(enum.IntEnum): ACS = "acs" class RequestTargetId(enum.IntEnum): ACS = 1 PLD = 2 class AcsHkIds(enum.IntEnum): MGM_SET = 1 class HkOpCodes: GENERATE_ONE_SHOT = ["0", "oneshot"] def make_target_id(target_id: int) -> bytes: byte_string = bytearray(struct.pack("!I", target_id)) return byte_string class TcHandler(TcHandlerBase): def __init__( self, seq_count_provider: FileSeqCountProvider, verif_wrapper: VerificationWrapper, ): super(TcHandler, self).__init__() self.seq_count_provider = seq_count_provider self.verif_wrapper = verif_wrapper self.queue_helper = DefaultPusQueueHelper( queue_wrapper=QueueWrapper.empty(), tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, pus_verificator=self.verif_wrapper.pus_verificator, default_pus_apid=EXAMPLE_PUS_APID, ) def send_cb(self, send_params: SendCbParams): entry_helper = send_params.entry if entry_helper.is_tc: if entry_helper.entry_type == TcQueueEntryType.PUS_TC: pus_tc_wrapper = entry_helper.to_pus_tc_entry() raw_tc = pus_tc_wrapper.pus_tc.pack() _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") send_params.com_if.send(raw_tc) elif entry_helper.entry_type == TcQueueEntryType.LOG: log_entry = entry_helper.to_log_entry() _LOGGER.info(log_entry.log_str) def queue_finished_cb(self, helper: ProcedureWrapper): if helper.proc_type == TcProcedureType.DEFAULT: def_proc = helper.to_def_procedure() _LOGGER.info( f"Queue handling finished for service {def_proc.service} and " f"op code {def_proc.op_code}" ) def feed_cb(self, helper: ProcedureWrapper, wrapper: FeedWrapper): q = self.queue_helper q.queue_wrapper = wrapper.queue_wrapper if helper.proc_type == TcProcedureType.DEFAULT: def_proc = helper.to_def_procedure() service = def_proc.service op_code = def_proc.op_code if ( service == CoreServiceList.SERVICE_17 or service == CoreServiceList.SERVICE_17_ALT ): q.add_log_cmd("Sending PUS ping telecommand") return q.add_pus_tc(PusTelecommand(service=17, subservice=1)) if service == CustomServiceList.ACS: if op_code == "set_mode": q.add_log_cmd() q.add_pus_tc() if service == CoreServiceList.SERVICE_11: if op_code == "0": q.add_log_cmd("Sending PUS scheduled TC telecommand") crt_time = CdsShortTimestamp.from_now() time_stamp = crt_time + datetime.timedelta(seconds=10) time_stamp = time_stamp.pack() return q.add_pus_tc( create_time_tagged_cmd( time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID, ) ) if op_code == "1": q.add_log_cmd("Sending PUS scheduled TC telecommand") crt_time = CdsShortTimestamp.from_now() time_stamp = crt_time + datetime.timedelta(seconds=10) time_stamp = time_stamp.pack() return q.add_pus_tc( create_time_tagged_cmd( time_stamp, PusTelecommand(service=8, subservice=1, app_data=make_target_id(RequestTargetId.PLD)), apid=EXAMPLE_PUS_APID, ) ) if service == CoreServiceList.SERVICE_8: q.add_log_cmd("Sending PUS action request telecommand") return q.add_pus_tc( PusTelecommand(service=8, subservice=1, app_data=make_target_id(RequestTargetId.PLD)) ) if service == CoreServiceList.SERVICE_3: if op_code in HkOpCodes.GENERATE_ONE_SHOT: q.add_log_cmd("Sending HK one shot request") q.add_pus_tc( generate_one_hk_command( make_addressable_id(RequestTargetId.ACS, AcsHkIds.MGM_SET) ) ) pass def main(): add_colorlog_console_logger(_LOGGER) tmtccmd.init_printout(False) hook_obj = SatRsConfigHook(json_cfg_path=default_json_path()) parser_wrapper = PreArgsParsingWrapper() parser_wrapper.create_default_parent_parser() parser_wrapper.create_default_parser() parser_wrapper.add_def_proc_args() params = SetupParams() post_args_wrapper = parser_wrapper.parse(hook_obj, params) proc_wrapper = ProcedureParamsWrapper() if post_args_wrapper.use_gui: post_args_wrapper.set_params_without_prompts(proc_wrapper) else: post_args_wrapper.set_params_with_prompts(proc_wrapper) params.apid = EXAMPLE_PUS_APID setup_args = SetupWrapper( hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper ) # Create console logger helper and file loggers tmtc_logger = RegularTmtcLogWrapper() printer = FsfwTmTcPrinter(tmtc_logger.logger) raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1) verificator = PusVerificator() verification_wrapper = VerificationWrapper( verificator, _LOGGER, printer.file_logger ) # Create primary TM handler and add it to the CCSDS Packet Handler tm_handler = PusHandler(verification_wrapper, printer, raw_logger) ccsds_handler = CcsdsTmHandler(generic_handler=None) ccsds_handler.add_apid_handler(tm_handler) # Create TC handler seq_count_provider = PusFileSeqCountProvider() tc_handler = TcHandler(seq_count_provider, verification_wrapper) tmtccmd.setup(setup_args=setup_args) init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper) tmtc_backend = tmtccmd.create_default_tmtc_backend( setup_wrapper=setup_args, tm_handler=ccsds_handler, tc_handler=tc_handler, init_procedure=init_proc, ) tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj) try: while True: state = tmtc_backend.periodic_op(None) if state.request == BackendRequest.TERMINATION_NO_ERROR: sys.exit(0) elif state.request == BackendRequest.DELAY_IDLE: _LOGGER.info("TMTC Client in IDLE mode") time.sleep(3.0) elif state.request == BackendRequest.DELAY_LISTENER: time.sleep(0.8) elif state.request == BackendRequest.DELAY_CUSTOM: if state.next_delay.total_seconds() <= 0.4: time.sleep(state.next_delay.total_seconds()) else: time.sleep(0.4) elif state.request == BackendRequest.CALL_NEXT: pass except KeyboardInterrupt: sys.exit(0) if __name__ == "__main__": main()