From 99165298d4ce73ad39590e99f5e9afbcc4f45dc3 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 28 Jul 2022 15:25:06 +0200 Subject: [PATCH] delete obsolete module, add log printout --- common.py | 65 ++++++++++++++++++++++++-------------------- pus_tc/tc_packing.py | 55 ------------------------------------- 2 files changed, 35 insertions(+), 85 deletions(-) delete mode 100644 pus_tc/tc_packing.py diff --git a/common.py b/common.py index a3b5907..7ad9bcc 100644 --- a/common.py +++ b/common.py @@ -123,38 +123,43 @@ class TcHandler(TcHandlerBase): def send_cb(self, params: SendCbParams): if params.entry.is_tc: if params.entry.entry_type == TcQueueEntryType.PUS_TC: - pus_tc_wrapper = params.entry.to_pus_tc_entry() - # TODO: All of this stuff should be done during queue insertion time - # This requires the queue helper to be optionally able to perform TC - # post-processing via a callback or something similar. Then an API can be - # added which is also able to pack time tagged TCs and stamping both TCs - pus_tc_wrapper.pus_tc.seq_count = ( - self.seq_count_provider.get_and_increment() + self.handle_tc_send_cb(params) + elif params.entry.entry_type == TcQueueEntryType.LOG: + LOGGER.info(params.entry.to_log_entry().log_str) + + def handle_tc_send_cb(self, params: SendCbParams): + pus_tc_wrapper = params.entry.to_pus_tc_entry() + # TODO: All of this stuff should be done during queue insertion time + # This requires the queue helper to be optionally able to perform TC + # post-processing via a callback or something similar. Then an API can be + # added which is also able to pack time tagged TCs and stamping both TCs + pus_tc_wrapper.pus_tc.seq_count = ( + self.seq_count_provider.get_and_increment() + ) + pus_tc_wrapper.pus_tc.apid = EXAMPLE_APID + if ( + pus_tc_wrapper.pus_tc.service == 11 + and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT + ): + try: + pus_tc = PusTelecommand.unpack( + pus_tc_wrapper.pus_tc.app_data[4:] ) - pus_tc_wrapper.pus_tc.apid = EXAMPLE_APID - if ( - pus_tc_wrapper.pus_tc.service == 11 - and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT - ): - try: - pus_tc = PusTelecommand.unpack( - pus_tc_wrapper.pus_tc.app_data[4:] - ) - self.pus_verificator.add_tc(pus_tc) - except ValueError as e: - LOGGER.warning( - f"Attempt of unpacking time tagged TC failed with exception {e}" - ) - # Add TC after Sequence Count stamping - self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc) - raw_tc = pus_tc_wrapper.pus_tc.pack() - self.raw_logger.log_tc(pus_tc_wrapper.pus_tc) - tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" - LOGGER.info(tc_info_string) - self.file_logger.info( - f"{get_current_time_string(True)}: {tc_info_string}" + self.pus_verificator.add_tc(pus_tc) + except ValueError as e: + LOGGER.warning( + f"Attempt of unpacking time tagged TC failed with exception {e}" ) - params.com_if.send(raw_tc) + # Add TC after Sequence Count stamping + self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc) + raw_tc = pus_tc_wrapper.pus_tc.pack() + self.raw_logger.log_tc(pus_tc_wrapper.pus_tc) + tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" + LOGGER.info(tc_info_string) + self.file_logger.info( + f"{get_current_time_string(True)}: {tc_info_string}" + ) + params.com_if.send(raw_tc) def queue_finished_cb(self, info: ProcedureHelper): if info is not None and info.proc_type == TcQueueEntryType.PUS_TC: diff --git a/pus_tc/tc_packing.py b/pus_tc/tc_packing.py deleted file mode 100644 index a7589ce..0000000 --- a/pus_tc/tc_packing.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -@brief This file transfers control of TC packing to the user -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -import logging -import os -from collections import deque -from typing import Union - -from spacepackets.ecss.tc import PusTelecommand - -from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands -from tmtccmd.com_if.com_interface_base import CommunicationInterface -from tmtccmd.logging import get_console_logger, get_current_time_string -from tmtccmd.logging.pus import log_raw_pus_tc -from tmtccmd.tc.definitions import TcQueueT -from tmtccmd.config.definitions import CoreServiceList, QueueCommands -from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into -from tmtccmd.pus.pus_17_test import pack_generic_service17_test - -from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into -from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into -from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into -from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands -from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into -from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into - -LOGGER = get_console_logger() - - -def pre_tc_send_cb( - queue_entry: Union[bytes, QueueCommands], - com_if: CommunicationInterface, - queue_info: Union[PusTelecommand, any], - file_logger: logging.Logger, -): - if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray): - log_raw_pus_tc( - packet=queue_entry, - srv_subservice=(queue_info.service, queue_info.subservice), - ) - tc_info_string = f"Sent {queue_info}" - LOGGER.info(tc_info_string) - file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") - com_if.send(data=queue_entry) - - -def create_total_tc_queue_user() -> TcQueueT: - if not os.path.exists("log"): - os.mkdir("log") - tc_queue = deque() - pack_service_2_commands_into(op_code="0", tc_queue=tc_queue) - pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue) - return tc_queue