diff --git a/pus_tm/action_reply_handler.py b/pus_tm/action_reply_handler.py index 347747f..470f640 100644 --- a/pus_tm/action_reply_handler.py +++ b/pus_tm/action_reply_handler.py @@ -6,99 +6,112 @@ from pus_tc.devs.ploc_supervisor import SupvActionIds from pus_tc.devs.star_tracker import StarTrackerActionIds from tmtccmd.logging import get_console_logger from tmtccmd.config.definitions import DataReplyUnpacked +from tmtccmd.tm import Service8FsfwTm +from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter LOGGER = get_console_logger() -def user_analyze_service_8_data( - object_id: bytes, action_id: int, custom_data: bytearray -) -> DataReplyUnpacked: +def handle_action_reply( + raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT +): + """Core Action reply handler + :return: """ - This function is called by the TMTC core if a Service 8 data reply (subservice 130) - is received. The user can return a tuple of two lists, where the first list - is a list of header strings to print and the second list is a list of values to print. - The TMTC core will take care of printing both lists and logging them. - - @param object_id: - @param action_id: - @param custom_data: - @return: - """ - if object_id == PDU_2_HANDLER_ID: - reply = DataReplyUnpacked() - reply.header_list = ["PDU2 Service 8 Reply"] - data_string = str() - for index in range(len(custom_data)): - data_string += str(hex(custom_data[index])) + " , " - data_string = data_string.rstrip() - data_string = data_string.rstrip(",") - data_string = data_string.rstrip() - reply.content_list = [data_string] - return reply - elif object_id == IMTQ_HANDLER_ID: - return handle_imtq_replies(action_id, custom_data) + tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm) + printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) + object_id = obj_id_dict.get(tm_packet.source_object_id) + custom_data = tm_packet.custom_data + action_id = tm_packet.action_id + generic_print_str = printer.generic_action_packet_tm_print(packet=tm_packet, obj_id=object_id) + print(generic_print_str) + printer.file_logger.info(generic_print_str) + if object_id == IMTQ_HANDLER_ID: + return handle_imtq_replies(action_id, printer, custom_data) elif object_id == PLOC_MPSOC_ID: - return handle_ploc_replies(action_id, custom_data) + return handle_ploc_replies(action_id, printer, custom_data) elif object_id == PLOC_SUPV_ID: - return handle_supervisor_replies(action_id, custom_data) + return handle_supervisor_replies(action_id, printer, custom_data) elif object_id == STAR_TRACKER_ID: - return handle_startracker_replies(action_id, custom_data) - return DataReplyUnpacked() + return handle_startracker_replies(action_id, printer, custom_data) -def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked: - reply = DataReplyUnpacked() +def handle_imtq_replies( + action_id: int, + printer: FsfwTmTcPrinter, + custom_data: bytearray +): if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]: - reply.header_list = [ + header_list = [ "Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole", ] - x_dipole = struct.unpack("!H", custom_data[:2]) - y_dipole = struct.unpack("!H", custom_data[2:4]) - z_dipole = struct.unpack("!H", custom_data[4:6]) - reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]] - return reply + [ + x_dipole, + y_dipole, + z_dipole + ] = struct.unpack("!HHH", custom_data[0:6]) + content_list = [x_dipole, y_dipole, z_dipole] + print(header_list) + print(content_list) + printer.file_logger.info(header_list) + printer.file_logger.info(content_list) -def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked: - reply = DataReplyUnpacked() +def handle_ploc_replies( + action_id: int, + printer: FsfwTmTcPrinter, + custom_data: bytearray +): if action_id == PlocReplyIds.tm_mem_read_report: - reply.header_list = [ + header_list = [ "PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data", ] - reply.content_list = [ + content_list = [ "0x" + custom_data[:4].hex(), struct.unpack("!H", custom_data[4:6])[0], "0x" + custom_data[6:10].hex(), ] - return reply + print(header_list) + print(content_list) + printer.file_logger.info(header_list) + printer.file_logger.info(content_list) def handle_supervisor_replies( - action_id: int, custom_data: bytearray -) -> DataReplyUnpacked: + action_id: int, + printer: FsfwTmTcPrinter, + custom_data: bytearray +): reply = DataReplyUnpacked() if action_id == SupvActionIds.DUMP_MRAM: - reply.header_list = ["MRAM Dump"] - reply.content_list = [custom_data[: len(custom_data)]] - return reply + header_list = ["MRAM Dump"] + content_list = [custom_data[: len(custom_data)]] + print(header_list) + print(content_list) + printer.file_logger.info(header_list) + printer.file_logger.info(content_list) def handle_startracker_replies( - action_id: int, custom_data: bytearray -) -> DataReplyUnpacked: - reply = DataReplyUnpacked() + action_id: int, + printer: FsfwTmTcPrinter, + custom_data: bytearray +): if action_id == StarTrackerActionIds.CHECKSUM: if len(custom_data) != 5: LOGGER.warning( "Star tracker reply has invalid length {0}".format(len(custom_data)) ) - return reply - reply.header_list = ["Checksum", "Checksum valid"] + return + header_list = ["Checksum", "Checksum valid"] print(custom_data[4]) checksum_valid_flag = custom_data[4] >> 8 - reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] - return reply + content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] + print(header_list) + print(content_list) + printer.file_logger.info(header_list) + printer.file_logger.info(content_list) diff --git a/pus_tm/factory_hook.py b/pus_tm/factory_hook.py index 01b5cab..b5a318a 100644 --- a/pus_tm/factory_hook.py +++ b/pus_tm/factory_hook.py @@ -1,9 +1,5 @@ +"""Core EIVE TM handler module """ -@brief This file transfers control of TM parsing to the user -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -from tmtccmd.tm.service_8_fsfw_functional_cmd import Service8FsfwTm from spacepackets.ecss.tm import PusTelemetry from tmtccmd.logging import get_console_logger from tmtccmd.logging.pus import ( @@ -20,9 +16,11 @@ from tmtccmd.utility.tmtc_printer import PrintFormats, FsfwTmTcPrinter from config.definitions import PUS_APID from config.object_ids import get_object_ids + from .event_handler import handle_event_packet from .verification_handler import handle_service_1_packet from .hk_handling import handle_hk_packet +from .action_reply_handler import handle_action_reply LOGGER = get_console_logger() @@ -55,8 +53,9 @@ def pus_factory_hook(raw_tm_packet: bytes): raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger ) elif service_type == 8: - tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet) - FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) + handle_action_reply( + raw_tm=raw_tm_packet, printer=FSFW_PRINTER, obj_id_dict=obj_id_dict + ) elif service_type == 17: tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet) FSFW_PRINTER.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index da1d1bf..0a62047 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -39,7 +39,7 @@ def handle_hk_packet( named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] - printer.generic_hk_print( + printer.generic_hk_tm_print( content_type=HkContentType.HK, object_id=named_obj_id, set_id=tm_packet.set_id, diff --git a/tmtccmd b/tmtccmd index 4d8e025..cf4cadc 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit 4d8e025b0144a139904c1502f41fe5afcf1504b0 +Subproject commit cf4cadcbdc2c05aa7be2d0b9e6ec51d2fffa3115