From e009167784dd7f5aa037a6862eb89ec292aa1a76 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 5 Jul 2022 02:12:54 +0200 Subject: [PATCH] api update done --- .gitignore | 1 + config/custom_hooks.py | 14 - config/custom_mode_op.py | 4 +- config/{hook_implementations.py => hook.py} | 2 +- deps/tmtccmd | 2 +- gomspace/gomspace_common.py | 12 +- pus_tc/cmd_definitions.py | 749 +++++++------------- pus_tc/devs/acu.py | 35 +- pus_tc/devs/ccsds_handler.py | 4 +- pus_tc/devs/gps.py | 18 +- pus_tc/devs/heater.py | 40 +- pus_tc/devs/imtq.py | 4 +- pus_tc/devs/p60dock.py | 11 +- pus_tc/devs/pcdu.py | 300 ++------ pus_tc/devs/pdu1.py | 2 +- pus_tc/devs/pdu2.py | 2 +- pus_tc/devs/ploc_memory_dumper.py | 4 +- pus_tc/devs/ploc_mpsoc.py | 8 +- pus_tc/devs/ploc_supervisor.py | 6 +- pus_tc/devs/plpcdu.py | 99 +-- pus_tc/devs/rad_sensor.py | 35 +- pus_tc/devs/reaction_wheels.py | 60 +- pus_tc/devs/rtd.py | 22 +- pus_tc/devs/star_tracker.py | 4 +- pus_tc/devs/str_img_helper.py | 4 +- pus_tc/devs/syrlinks_hk_handler.py | 10 +- pus_tc/devs/tmp1075.py | 4 +- pus_tc/procedure_packer.py | 22 +- pus_tc/prompt_parameters.py | 34 +- pus_tc/system/controllers.py | 26 +- pus_tc/system/core.py | 50 +- pus_tc/system/proc.py | 14 +- pus_tm/action_reply_handler.py | 2 - pus_tm/devs/gyros.py | 8 +- pus_tm/devs/mgms.py | 8 +- pus_tm/devs/reaction_wheels.py | 4 +- pus_tm/devs/sus.py | 4 +- pus_tm/hk_handling.py | 6 +- pus_tm/system/tcs.py | 7 +- pus_tm/tm_tcp_server.py | 8 +- tmtcc.py | 2 +- 41 files changed, 547 insertions(+), 1104 deletions(-) delete mode 100644 config/custom_hooks.py rename config/{hook_implementations.py => hook.py} (95%) diff --git a/.gitignore b/.gitignore index 47b4b46..59c8d29 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ log /gps_log.txt /config/*.json tmtc_conf.json +/seqcnt.txt diff --git a/config/custom_hooks.py b/config/custom_hooks.py deleted file mode 100644 index 450de42..0000000 --- a/config/custom_hooks.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -@brief This file exposes hook functions to the user. -@details Template configuration file. Copy this folder to the TMTC commander root and adapt - it to your needs. -""" -from spacepackets.ecss.tc import PusTelecommand - - -def command_preparation_hook() -> PusTelecommand: - """ - Can be used to pack user-defined commands by generating and returning a PusTelecommand - class instance - """ - return PusTelecommand(service=17, subservice=1, ssc=20) diff --git a/config/custom_mode_op.py b/config/custom_mode_op.py index 64b9033..7981371 100644 --- a/config/custom_mode_op.py +++ b/config/custom_mode_op.py @@ -5,7 +5,7 @@ """ import enum -from tmtccmd.core.backend import TmTcHandler +from tmtccmd import CcsdsTmtcBackend from tmtccmd.logging import get_console_logger LOGGER = get_console_logger() @@ -15,5 +15,5 @@ class CustomModeList(enum.IntEnum): pass -def custom_mode_operation(tmtc_backend: TmTcHandler, mode: int): +def custom_mode_operation(_tmtc_backend: CcsdsTmtcBackend, _mode: int): pass diff --git a/config/hook_implementations.py b/config/hook.py similarity index 95% rename from config/hook_implementations.py rename to config/hook.py index 3959114..af55692 100644 --- a/config/hook_implementations.py +++ b/config/hook.py @@ -30,7 +30,7 @@ class EiveHookObject(TmTcCfgHookBase): def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): from config.custom_mode_op import custom_mode_operation - custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend) + custom_mode_operation(tmtc_backend, mode) def get_object_ids(self) -> ObjectIdDictT: from config.object_ids import get_object_ids diff --git a/deps/tmtccmd b/deps/tmtccmd index 2890c62..7835963 160000 --- a/deps/tmtccmd +++ b/deps/tmtccmd @@ -1 +1 @@ -Subproject commit 2890c62e31999706ff1a7164b8a9f7cbc7bc34d5 +Subproject commit 7835963dfebc95237eec791b07a8faaf9185ba0b diff --git a/gomspace/gomspace_common.py b/gomspace/gomspace_common.py index 7a59471..aa859ff 100644 --- a/gomspace/gomspace_common.py +++ b/gomspace/gomspace_common.py @@ -10,9 +10,9 @@ import enum import struct from typing import Union +from spacepackets.ecss import PusTelecommand from tmtccmd.tc.pus_8_funccmd import generate_action_command -from tmtccmd.tc.definitions import PusTelecommand -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class GomspaceDeviceActionIds(enum.IntEnum): @@ -138,7 +138,7 @@ def pack_set_param_command( ) -def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand: +def pack_ping_command(object_id: ObjectIdU32, data: bytearray) -> PusTelecommand: """ " Function to generate the command to ping a gomspace device @param object_id Object Id of the gomspace device handler. @param data Bytearray containing the bytes to send to the gomspace device. For now the on board software @@ -153,7 +153,7 @@ def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand: ) -def pack_gnd_wdt_reset_command(object_id: ObjectId) -> PusTelecommand: +def pack_gnd_wdt_reset_command(object_id: ObjectIdU32) -> PusTelecommand: """ " Function to generate the command to reset the watchdog of a gomspace device. @param object_id Object Id of the gomspace device handler. """ @@ -162,7 +162,7 @@ def pack_gnd_wdt_reset_command(object_id: ObjectId) -> PusTelecommand: ) -def pack_reboot_command(object_id: ObjectId) -> PusTelecommand: +def pack_reboot_command(object_id: ObjectIdU32) -> PusTelecommand: """Function to generate the command which triggers a reboot of a gomspace device @param object_id The object id of the gomspace device handler. """ @@ -171,7 +171,7 @@ def pack_reboot_command(object_id: ObjectId) -> PusTelecommand: ) -def pack_request_full_hk_table_command(object_id: ObjectId) -> PusTelecommand: +def pack_request_full_hk_table_command(object_id: ObjectIdU32) -> PusTelecommand: """Function to generate the command to request the full housekeeping table from a gomspace device. @param object_id The object id of the gomspace device handler. diff --git a/pus_tc/cmd_definitions.py b/pus_tc/cmd_definitions.py index c35dbaa..3a3ad36 100644 --- a/pus_tc/cmd_definitions.py +++ b/pus_tc/cmd_definitions.py @@ -11,597 +11,364 @@ from pus_tc.devs.reaction_wheels import add_rw_cmds from pus_tc.devs.bpx_batt import BpxOpCodes from config.definitions import CustomServiceList -from tmtccmd.config import TmTcDefWrapper +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config.globals import get_default_tmtc_defs def get_eive_service_op_code_dict() -> TmTcDefWrapper: def_wrapper = get_default_tmtc_defs() - add_bpx_cmd_definitions(cmd_dict=def_wrapper) + add_bpx_cmd_definitions(defs=def_wrapper) add_core_controller_definitions(defs=def_wrapper) - add_pl_pcdu_cmds(cmd_dict=def_wrapper) - add_pcdu_cmds(cmd_dict=def_wrapper) - specify_rtd_cmds(cmd_dict=def_wrapper) - add_imtq_cmds(cmd_dict=def_wrapper) - add_rad_sens_cmds(cmd_dict=def_wrapper) - add_rw_cmds(cmd_dict=def_wrapper) - add_ploc_mpsoc_cmds(cmd_dict=def_wrapper) - add_ploc_supv_cmds(cmd_dict=def_wrapper) - add_system_cmds(cmd_dict=def_wrapper) - add_time_cmds(cmd_dict=def_wrapper) - add_syrlinks_cmds(cmd_dict=def_wrapper) - add_gps_cmds(cmd_dict=def_wrapper) - add_str_cmds(cmd_dict=def_wrapper) - add_ccsds_cmds(cmd_dict=def_wrapper) - add_pdec_cmds(cmd_dict=def_wrapper) - add_heater_cmds(cmd_dict=def_wrapper) - add_tmp_sens_cmds(cmd_dict=def_wrapper) - add_proc_cmds(cmd_dict=def_wrapper) + add_pl_pcdu_cmds(defs=def_wrapper) + add_pcdu_cmds(defs=def_wrapper) + specify_rtd_cmds(defs=def_wrapper) + add_imtq_cmds(defs=def_wrapper) + add_rad_sens_cmds(defs=def_wrapper) + add_rw_cmds(defs=def_wrapper) + add_ploc_mpsoc_cmds(defs=def_wrapper) + add_ploc_supv_cmds(defs=def_wrapper) + add_system_cmds(defs=def_wrapper) + add_time_cmds(defs=def_wrapper) + add_syrlinks_cmds(defs=def_wrapper) + add_gps_cmds(defs=def_wrapper) + add_str_cmds(defs=def_wrapper) + add_ccsds_cmds(defs=def_wrapper) + add_pdec_cmds(defs=def_wrapper) + add_heater_cmds(defs=def_wrapper) + add_tmp_sens_cmds(defs=def_wrapper) + add_proc_cmds(defs=def_wrapper) return def_wrapper def add_tmp_sens_cmds(defs: TmTcDefWrapper): - op_code_dict = { - "0": ("TMP1075 Tests", {OpCodeDictKeys.TIMEOUT: 2.2}), - } - service_tuple = ("TMP1075 1", op_code_dict) - cmd_dict[CustomServiceList.TMP1075_1.value] = service_tuple - service_tuple = ("TMP1075 2", op_code_dict) - cmd_dict[CustomServiceList.TMP1075_2.value] = service_tuple + oce = OpCodeEntry() + oce.add("0", "TMP1075 Tests") + defs.add_service(CustomServiceList.TMP1075_1.value, "TMP1075 1", oce) + defs.add_service(CustomServiceList.TMP1075_2.value, "TMP1075 2", oce) -def add_pdec_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_pdec_handler = { - "0": ("PDEC Handler: Print CLCW", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("PDEC Handler: Print PDEC monitor", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_pdec_handler_tuple = ("PDEC Handler", op_code_dict_srv_pdec_handler) - cmd_dict[CustomServiceList.PDEC_HANDLER.value] = service_pdec_handler_tuple +def add_pdec_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "PDEC Handler: Print CLCW") + oce.add("1", "PDEC Handler: Print PDEC monitor") + defs.add_service(CustomServiceList.PDEC_HANDLER.value, "PDEC Handler", oce) -def add_ccsds_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_ccsds_handler = { - "0": ("CCSDS Handler: Set low rate", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("CCSDS Handler: Set high rate", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("CCSDS Handler: Set arbitrary bitrate", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ( - "CCSDS Handler: Enable tx clock manipulator", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "6": ( - "CCSDS Handler: Disable tx clock manipulator", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "7": ( - "CCSDS Handler: Update tx data on rising edge", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "8": ( - "CCSDS Handler: Update tx data on falling edge", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - } - service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler) - cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple +def add_ccsds_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "CCSDS Handler: Set low rate") + oce.add("1", "CCSDS Handler: Set high rate") + oce.add("2", "CCSDS Handler: Enable transmitter") + oce.add("3", "CCSDS Handler: Disable transmitter") + oce.add("4", "CCSDS Handler: Set arbitrary bitrate") + oce.add("5", "CCSDS Handler: Enable tx clock manipulator") + oce.add("6", "CCSDS Handler: Disable tx clock manipulator") + oce.add("7", "CCSDS Handler: Update tx data on rising edge") + oce.add("8", "CCSDS Handler: Update tx data on falling edge") + defs.add_service(CustomServiceList.CCSDS_HANDLER.value, "CCSDS Handler", oce) -def add_str_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_star_tracker = { - "0": ( - "Star Tracker: Mode On, Submode Bootloader", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "1": ("Star Tracker: Mode On, Submode Firmware", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("Star Tracker: Mode Normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("Star Tracker: Mode Off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("Star Tracker: Mode Raw", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("Star Tracker: Ping", {OpCodeDictKeys.TIMEOUT: 5.0}), - "6": ( - "Star Tracker: Switch to bootloader program", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "7": ("Star Tracker: Request temperature", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ("Star Tracker: Request version", {OpCodeDictKeys.TIMEOUT: 2.0}), - "9": ("Star Tracker: Request interface", {OpCodeDictKeys.TIMEOUT: 2.0}), - "10": ("Star Tracker: Request power", {OpCodeDictKeys.TIMEOUT: 2.0}), - "11": ( - "Star Tracker: Set subscription parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "12": ( - "Star Tracker: Boot image (requires bootloader mode)", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "13": ("Star Tracker: Request time", {OpCodeDictKeys.TIMEOUT: 2.0}), - "14": ("Star Tracker: Request solution", {OpCodeDictKeys.TIMEOUT: 2.0}), - "15": ("Star Tracker: Upload image", {OpCodeDictKeys.TIMEOUT: 2.0}), - "16": ("Star Tracker: Download image", {OpCodeDictKeys.TIMEOUT: 2.0}), - "17": ("Star Tracker: Set limit parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "18": ("Star Tracker: Set tracking parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "19": ("Star Tracker: Set mounting parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "20": ("Star Tracker: Set camera parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "22": ( - "Star Tracker: Set centroiding parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "23": ("Star Tracker: Set LISA parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "24": ("Star Tracker: Set matching parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "25": ( - "Star Tracker: Set validation parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "26": ("Star Tracker: Set algo parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "27": ("Star Tracker: Take image", {OpCodeDictKeys.TIMEOUT: 2.0}), - "28": ("Star Tracker: Stop str helper", {OpCodeDictKeys.TIMEOUT: 2.0}), - "30": ( - "Star Tracker: Set name of download image", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "31": ("Star Tracker: Request histogram", {OpCodeDictKeys.TIMEOUT: 2.0}), - "32": ("Star Tracker: Request contrast", {OpCodeDictKeys.TIMEOUT: 2.0}), - "33": ("Star Tracker: Set json filename", {OpCodeDictKeys.TIMEOUT: 2.0}), - "35": ("Star Tracker: Flash read", {OpCodeDictKeys.TIMEOUT: 2.0}), - "36": ("Star Tracker: Set flash read filename", {OpCodeDictKeys.TIMEOUT: 2.0}), - "37": ("Star Tracker: Get checksum", {OpCodeDictKeys.TIMEOUT: 2.0}), - "49": ("Star Tracker: Request camera parameter", {OpCodeDictKeys.TIMEOUT: 2.0}), - "50": ("Star Tracker: Request limits", {OpCodeDictKeys.TIMEOUT: 2.0}), - "51": ( - "Star Tracker: Set image processor parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "52": ( - "Star Tracker: (EGSE only) Load camera ground config ", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "53": ( - "Star Tracker: (EGSE only) Load camera flight config", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "54": ( - "Star Tracker: Request log level parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "55": ( - "Star Tracker: Request mounting parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "56": ( - "Star Tracker: Request image processor parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "57": ( - "Star Tracker: Request centroiding parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "58": ("Star Tracker: Request lisa parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "59": ( - "Star Tracker: Request matching parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "60": ( - "Star Tracker: Request tracking parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "61": ( - "Star Tracker: Request validation parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "62": ("Star Tracker: Request algo parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "63": ( - "Star Tracker: Request subscription parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "64": ( - "Star Tracker: Request log subscription parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "65": ( - "Star Tracker: Request debug camera parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "66": ("Star Tracker: Set log level parameters", {OpCodeDictKeys.TIMEOUT: 2.0}), - "67": ( - "Star Tracker: Set log subscription parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "68": ( - "Star Tracker: Set debug camera parameters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "69": ("Star Tracker: Firmware update", {OpCodeDictKeys.TIMEOUT: 2.0}), - "70": ( - "Star Tracker: Disable timestamp generation", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "71": ( - "Star Tracker: Enable timestamp generation", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - } - service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker) - cmd_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple +def add_str_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "Star Tracker: Mode On, Submode Bootloader") + oce.add("1", "Star Tracker: Mode On, Submode Firmware") + oce.add("2", "Star Tracker: Mode Normal") + oce.add("3", "Star Tracker: Mode Off") + oce.add("4", "Star Tracker: Mode Raw") + oce.add("5", "Star Tracker: Ping") + oce.add("6", "Star Tracker: Switch to bootloader program") + oce.add("7", "Star Tracker: Request temperature") + oce.add("8", "Star Tracker: Request version") + oce.add("9", "Star Tracker: Request interface") + oce.add("10", "Star Tracker: Request power") + oce.add("11", "Star Tracker: Set subscription parameters") + oce.add("12", "Star Tracker: Boot image (requires bootloader mode)") + oce.add("13", "Star Tracker: Request time") + oce.add("14", "Star Tracker: Request solution") + oce.add("15", "Star Tracker: Upload image") + oce.add("16", "Star Tracker: Download image") + oce.add("17", "Star Tracker: Set limit parameters") + oce.add("17", "Star Tracker: Set limit parameters") + oce.add("18", "Star Tracker: Set tracking parameters") + oce.add("19", "Star Tracker: Set mounting parameters") + oce.add("20", "Star Tracker: Set camera parameters") + oce.add("22", "Star Tracker: Set centroiding parameters") + oce.add("23", "Star Tracker: Set LISA parameters") + oce.add("24", "Star Tracker: Set matching parameters") + oce.add("25", "Star Tracker: Set validation parameters") + oce.add("26", "Star Tracker: Set algo parameters") + oce.add("27", "Star Tracker: Take image") + oce.add("28", "Star Tracker: Stop str helper") + oce.add("30", "Star Tracker: Set name of download image") + oce.add("31", "Star Tracker: Request histogram") + oce.add("32", "Star Tracker: Request contrast") + oce.add("33", "Star Tracker: Set json filename") + oce.add("35", "Star Tracker: Flash read") + oce.add("36", "Star Tracker: Set flash read filename") + oce.add("37", "Star Tracker: Get checksum") + oce.add("49", "Star Tracker: Request camera parameter") + oce.add("50", "Star Tracker: Request limits") + oce.add("51", "Star Tracker: Set image processor parameters") + oce.add("52", "Star Tracker: (EGSE only) Load camera ground config") + oce.add("53", "Star Tracker: (EGSE only) Load camera flight config") + oce.add("54", "Star Tracker: Request log level parameters") + oce.add("55", "Star Tracker: Request mounting parameters") + oce.add("56", "Star Tracker: Request image processor parameters") + oce.add("57", "Star Tracker: Request centroiding parameters") + oce.add("58", "Star Tracker: Request lisa parameters") + oce.add("59", "Star Tracker: Request matching parameters") + oce.add("60", "Star Tracker: Request tracking parameters") + oce.add("61", "Star Tracker: Request validation parameters") + oce.add("62", "Star Tracker: Request algo parameters") + oce.add("63", "Star Tracker: Request subscription parameters") + oce.add("64", "Star Tracker: Request log subscription parameters") + oce.add("65", "Star Tracker: Request debug camera parameters") + oce.add("66", "Star Tracker: Set log level parameters") + oce.add("67", "Star Tracker: Set log subscription parameters") + oce.add("68", "Star Tracker: Set debug camera parameters") + oce.add("69", "Star Tracker: Firmware update") + oce.add("70", "Star Tracker: Disable timestamp generation") + oce.add("71", "Star Tracker: Enable timestamp generation") + defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce) -def add_syrlinks_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_syrlinks_handler = { - "0": ("Syrlinks Handler: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("Syrlinks Handler: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("Syrlinks Handler: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("Syrlinks Handler: Set TX standby", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("Syrlinks Handler: Set TX modulation", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("Syrlinks Handler: Set TX carrier wave", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("Syrlinks Handler: Read TX status", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ("Syrlinks Handler: Read TX waveform", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ( - "Syrlinks Handler: Read TX AGC value high byte ", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "9": ( - "Syrlinks Handler: Read TX AGC value low byte ", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "12": ( - "Syrlinks Handler: Write LCL config", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "13": ( - "Syrlinks Handler: Read RX status registers", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "14": ( - "Syrlinks Handler: Read LCL config register", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "15": ( - "Syrlinks Handler: Set waveform OQPSK", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "16": ( - "Syrlinks Handler: Set waveform BPSK", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "17": ( - "Syrlinks Handler: Set second config", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "18": ( - "Syrlinks Handler: Enable debug output", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "19": ( - "Syrlinks Handler: Disable debug output", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - } - service_syrlinks_handler_tuple = ( - "Syrlinks Handler", - op_code_dict_srv_syrlinks_handler, +def add_syrlinks_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "Syrlinks Handler: Set mode off") + oce.add("1", "Syrlinks Handler: Set mode on") + oce.add("2", "Syrlinks Handler: Set mode normal") + oce.add("3", "Syrlinks Handler: Set TX standby") + oce.add("4", "Syrlinks Handler: Set TX modulation") + oce.add("5", "Syrlinks Handler: Set TX carrier wave") + oce.add("6", "Syrlinks Handler: Read TX status") + oce.add("7", "Syrlinks Handler: Read TX waveform") + oce.add("8", "Syrlinks Handler: Read TX AGC value high byte") + oce.add("9", "Syrlinks Handler: Read TX AGC value low byte") + oce.add("12", "Syrlinks Handler: Write LCL config") + oce.add("13", "Syrlinks Handler: Read RX status registers") + oce.add("14", "Syrlinks Handler: Read LCL config register") + oce.add("15", "Syrlinks Handler: Set waveform OQPSK") + oce.add("16", "Syrlinks Handler: Set waveform BPSK") + oce.add("17", "Syrlinks Handler: Set second config") + oce.add("18", "Syrlinks Handler: Enable debug output") + oce.add("19", "Syrlinks Handler: Disable debug output") + defs.add_service(CustomServiceList.SYRLINKS.value, "Syrlinks Handler", oce) + + +def add_bpx_cmd_definitions(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=BpxOpCodes.HK, info="Request BPX HK") + oce.add(keys=BpxOpCodes.RST_BOOT_CNT, info="Reset Boot Count") + oce.add(keys=BpxOpCodes.REQUEST_CFG, info="Request Configuration Struct (Step 1)") + oce.add( + keys=BpxOpCodes.REQUEST_CFG_HK, info="Request Configuration Struct HK (Step 2)" ) - cmd_dict[CustomServiceList.SYRLINKS.value] = service_syrlinks_handler_tuple - - -def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, keys=BpxOpCodes.HK, info="Request BPX HK" - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=BpxOpCodes.RST_BOOT_CNT, info="Reset Boot Count" - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=BpxOpCodes.REQUEST_CFG, - info="Request Configuration Struct (Step 1)", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=BpxOpCodes.REQUEST_CFG_HK, - info="Request Configuration Struct HK (Step 2)", - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=BpxOpCodes.REBOOT, info="Reboot Command" - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + oce.add(keys=BpxOpCodes.REBOOT, info="Reboot Command") + defs.add_service( name=CustomServiceList.BPX_BATTERY.value, info="BPX Battery Handler", - op_code_entry=op_code_dict, + op_code_entry=oce, ) -def add_time_cmds(cmd_dict: ServiceOpCodeDictT): +def add_time_cmds(defs: TmTcDefWrapper): from pus_tc.system.time import OpCodes, Info - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, + oce = OpCodeEntry() + oce.add( keys=OpCodes.SET_CURRENT_TIME, info=Info.SET_CURRENT_TIME, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.TIME.value, info="Time Service", - op_code_entry=op_code_dict, + op_code_entry=oce, ) -def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_imtq = { - "0": ("Mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("Mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("Mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}), - "9": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), - "10": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}), - "11": ("IMTQ get engineering hk set", {OpCodeDictKeys.TIMEOUT: 2.0}), - "12": ( - "IMTQ get calibrated MTM measurement one shot", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "13": ("IMTQ get raw MTM measurement one shot", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq) - cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple +def add_imtq_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "Mode Off") + oce.add("1", "Mode On") + oce.add("2", "Mode Normal") + oce.add("3", "IMTQ perform pos X self test") + oce.add("4", "IMTQ perform neg X self test") + oce.add("5", "IMTQ perform pos Y self test") + oce.add("6", "IMTQ perform neg Y self test") + oce.add("7", "IMTQ perform pos Z self test") + oce.add("8", "IMTQ perform neg Z self test") + oce.add("9", "IMTQ command dipole") + oce.add("10", "IMTQ get commanded dipole") + oce.add("11", "IMTQ get engineering hk set") + oce.add("12", "IMTQ get calibrated MTM measurement one shot") + oce.add("13", "IMTQ get raw MTM measurement one shot") + defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce) -def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_srv_ploc_mpsoc = { - "0": ("Ploc MPSoC: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("Ploc MPSoC: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("Ploc MPSoC: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("Ploc MPSoC: Memory write", {OpCodeDictKeys.TIMEOUT: 2.0}), - "4": ("Ploc MPSoC: Memory read", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("Ploc MPSoC: Flash write", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("Ploc MPSoC: Flash delete", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ("Ploc MPSoC: Replay start", {OpCodeDictKeys.TIMEOUT: 2.0}), - "8": ("Ploc MPSoC: Replay stop", {OpCodeDictKeys.TIMEOUT: 2.0}), - "9": ("Ploc MPSoC: Downlink pwr on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "10": ("Ploc MPSoC: Downlink pwr off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "11": ("Ploc MPSoC: Replay write sequence", {OpCodeDictKeys.TIMEOUT: 2.0}), - "12": ("Ploc MPSoC: OBSW reset sequence count", {OpCodeDictKeys.TIMEOUT: 2.0}), - "13": ("Ploc MPSoC: Read DEADBEEF address", {OpCodeDictKeys.TIMEOUT: 2.0}), - "14": ("Ploc MPSoC: Mode replay", {OpCodeDictKeys.TIMEOUT: 2.0}), - "15": ("Ploc MPSoC: Mode idle", {OpCodeDictKeys.TIMEOUT: 2.0}), - "16": ("Ploc MPSoC: Tc cam command send", {OpCodeDictKeys.TIMEOUT: 2.0}), - "17": ("Ploc MPSoC: Set UART TX tristate", {OpCodeDictKeys.TIMEOUT: 2.0}), - "18": ("Ploc MPSoC: Relesase UART TX", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_ploc_mpsoc_tuple = ("Ploc MPSoC", op_code_dict_srv_ploc_mpsoc) - cmd_dict[CustomServiceList.PLOC_MPSOC.value] = service_ploc_mpsoc_tuple +def add_ploc_mpsoc_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "Ploc MPSoC: Set mode off") + oce.add("1", "Ploc MPSoC: Set mode on") + oce.add("2", "Ploc MPSoC: Set mode normal") + oce.add("3", "Ploc MPSoC: Memory write") + oce.add("4", "Ploc MPSoC: Memory read") + oce.add("5", "Ploc MPSoC: Flash write") + oce.add("6", "Ploc MPSoC: Flash delete") + oce.add("7", "Ploc MPSoC: Replay start") + oce.add("8", "Ploc MPSoC: Replay stop") + oce.add("9", "Ploc MPSoC: Downlink pwr on") + oce.add("10", "Ploc MPSoC: Downlink pwr off") + oce.add("11", "Ploc MPSoC: Replay write sequence") + oce.add("12", "Ploc MPSoC: OBSW reset sequence count") + oce.add("13", "Ploc MPSoC: Read DEADBEEF address") + oce.add("14", "Ploc MPSoC: Mode replay") + oce.add("15", "Ploc MPSoC: Mode idle") + oce.add("16", "Ploc MPSoC: Tc cam command send") + oce.add("17", "Ploc MPSoC: Set UART TX tristate") + oce.add("18", "Ploc MPSoC: Relesase UART TX") + defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce) -def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict_ploc_mem_dumper = { - "0": ("PLOC Memory Dumper: MRAM dump", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_ploc_memory_dumper_tuple = ( - "PLOC Memory Dumper", - op_code_dict_ploc_mem_dumper, +def add_ploc_supv_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add("0", "PLOC Memory Dumper: MRAM dump") + defs.add_service( + CustomServiceList.PLOC_MEMORY_DUMPER.value, "PLOC Memory Dumper", oce ) - - op_code_dict_srv_ploc_supv = { - "0": ("PLOC Supervisor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}), - "1": ("PLOC Supervisor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}), - "2": ("PLOC Supervisor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}), - "3": ("PLOC Supervisor: Get HK Report", {OpCodeDictKeys.TIMEOUT: 2.0}), - "5": ("PLOC Supervisor: Start MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), - "6": ("PLOC Supervisor: Shutdown MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), - "7": ( - "PLOC Supervisor: Select MPSoC boot image", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "8": ("PLOC Supervisor: Set max restart tries", {OpCodeDictKeys.TIMEOUT: 2.0}), - "9": ("PLOC Supervisor: Reset MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}), - "10": ("PLOC Supervisor: Set time reference", {OpCodeDictKeys.TIMEOUT: 2.0}), - "11": ("PLOC Supervisor: Set boot timeout", {OpCodeDictKeys.TIMEOUT: 2.0}), - "12": ("PLOC Supervisor: Disable Hk", {OpCodeDictKeys.TIMEOUT: 2.0}), - "13": ( - "PLOC Supervisor: Request boot status report", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "17": ("PLOC Supervisor: Enable latchup alert", {OpCodeDictKeys.TIMEOUT: 2.0}), - "18": ("PLOC Supervisor: Disable latchup alert", {OpCodeDictKeys.TIMEOUT: 2.0}), - "20": ("PLOC Supervisor: Set alert limit", {OpCodeDictKeys.TIMEOUT: 2.0}), - "23": ( - "PLOC Supervisor: Set ADC enabled channels", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "24": ( - "PLOC Supervisor: Set ADC window and stride", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "25": ("PLOC Supervisor: Set ADC threshold", {OpCodeDictKeys.TIMEOUT: 2.0}), - "26": ( - "PLOC Supervisor: Request latchup status report", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "27": ("PLOC Supervisor: Copy ADC data to MRAM", {OpCodeDictKeys.TIMEOUT: 2.0}), - "30": ("PLOC Supervisor: Run auto EM tests", {OpCodeDictKeys.TIMEOUT: 2.0}), - "31": ("PLOC Supervisor: MRAM Wipe", {OpCodeDictKeys.TIMEOUT: 2.0}), - "35": ("PLOC Supervisor: Set GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), - "36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), - "37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}), - "38": ( - "PLOC Supervisor: Factory reset clear all", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "39": ( - "PLOC Supervisor: Factory reset clear mirror entries", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "40": ( - "PLOC Supervisor: Factory reset clear circular entries", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "42": ("PLOC Supervisor: Perform update", {OpCodeDictKeys.TIMEOUT: 2.0}), - "43": ( - "PLOC Supervisor: Terminate supervisor process", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "44": ("PLOC Supervisor: Start MPSoC quiet", {OpCodeDictKeys.TIMEOUT: 2.0}), - "45": ("PLOC Supervisor: Set shutdown timeout", {OpCodeDictKeys.TIMEOUT: 2.0}), - "46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}), - "47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}), - "48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}), - "51": ( - "PLOC Supervisor: Logging request event buffers", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "52": ( - "PLOC Supervisor: Logging clear counters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}), - "54": ( - "PLOC Supervisor: Logging request counters", - {OpCodeDictKeys.TIMEOUT: 2.0}, - ), - "55": ("PLOC Supervisor: Request ADC Report", {OpCodeDictKeys.TIMEOUT: 2.0}), - "56": ("PLOC Supervisor: Reset PL", {OpCodeDictKeys.TIMEOUT: 2.0}), - "57": ("PLOC Supervisor: Enable NVMs", {OpCodeDictKeys.TIMEOUT: 2.0}), - "58": ("PLOC Supervisor: Continue update", {OpCodeDictKeys.TIMEOUT: 2.0}), - } - service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) - cmd_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple - cmd_dict[ - CustomServiceList.PLOC_MEMORY_DUMPER.value - ] = service_ploc_memory_dumper_tuple + oce = OpCodeEntry() + oce.add("1", "PLOC Supervisor: Set mode off") + oce.add("2", "PLOC Supervisor: Set mode normal") + oce.add("3", "PLOC Supervisor: Get HK Report") + oce.add("5", "PLOC Supervisor: Start MPSoC") + oce.add("6", "PLOC Supervisor: Shutdown MPSoC") + oce.add("7", "PLOC Supervisor: Select MPSoC boot image") + oce.add("8", "PLOC Supervisor: Set max restart tries") + oce.add("9", "PLOC Supervisor: Reset MPSoC") + oce.add("10", "PLOC Supervisor: Set time reference") + oce.add("11", "PLOC Supervisor: Set boot timeout") + oce.add("12", "PLOC Supervisor: Disable Hk") + oce.add("13", "PLOC Supervisor: Request boot status report") + oce.add("17", "PLOC Supervisor: Enable latchup alert") + oce.add("18", "PLOC Supervisor: Disable latchup alert") + oce.add("20", "PLOC Supervisor: Set alert limit") + oce.add("23", "PLOC Supervisor: Set ADC enabled channels") + oce.add("24", "PLOC Supervisor: Set ADC window and stride") + oce.add("25", "PLOC Supervisor: Set ADC threshold") + oce.add("26", "PLOC Supervisor: Request latchup status report") + oce.add("27", "PLOC Supervisor: Copy ADC data to MRAM") + oce.add("30", "PLOC Supervisor: Run auto EM tests") + oce.add("31", "PLOC Supervisor: MRAM Wipe") + oce.add("35", "PLOC Supervisor: Set GPIO") + oce.add("36", "PLOC Supervisor: Read GPIO") + oce.add("37", "PLOC Supervisor: Restart supervisor") + oce.add("38", "PLOC Supervisor: Factory reset clear all") + oce.add("39", "PLOC Supervisor: Factory reset clear mirror entries") + oce.add("40", "PLOC Supervisor: Factory reset clear circular entries") + oce.add("42", "PLOC Supervisor: Perform update") + oce.add("43", "PLOC Supervisor: Terminate supervisor process") + oce.add("44", "PLOC Supervisor: Start MPSoC quiet") + oce.add("45", "PLOC Supervisor: Set shutdown timeout") + oce.add("46", "PLOC Supervisor: Factory flash") + oce.add("47", "PLOC Supervisor: Enable auto TM") + oce.add("48", "PLOC Supervisor: Disable auto TM") + oce.add("51", "PLOC Supervisor: Logging request event buffers") + oce.add("52", "PLOC Supervisor: Logging clear counters") + oce.add("53", "PLOC Supervisor: Logging set topic") + oce.add("54", "PLOC Supervisor: Logging request counters") + oce.add("55", "PLOC Supervisor: Request ADC Report") + oce.add("56", "PLOC Supervisor: Reset PL") + oce.add("57", "PLOC Supervisor: Enable NVMs") + oce.add("58", "PLOC Supervisor: Continue update") + defs.add_service(CustomServiceList.PLOC_SUPV.value, "PLOC Supervisor", oce) -def add_system_cmds(cmd_dict: ServiceOpCodeDictT): +def add_system_cmds(defs: TmTcDefWrapper): from pus_tc.system.acs import AcsOpCodes, SusOpCodes import pus_tc.system.tcs as tcs import pus_tc.system.controllers as controllers - default_opts = generate_op_code_options( - enter_listener_mode=False, custom_timeout=8.0 - ) - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, + oce = OpCodeEntry() + oce.add( keys=AcsOpCodes.ACS_ASS_A_SIDE, info="Switch to ACS board A side", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=AcsOpCodes.ACS_ASS_B_SIDE, info="Switch to ACS board B side", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=AcsOpCodes.ACS_ASS_DUAL_MODE, info="Switch to ACS board dual mode", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=AcsOpCodes.ACS_ASS_A_ON, info="Switch ACS board A side on", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=AcsOpCodes.ACS_ASS_B_ON, info="Switch ACS board B side on", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=AcsOpCodes.ACS_ASS_DUAL_ON, info="Switch ACS board dual mode on", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=AcsOpCodes.ACS_ASS_OFF, info="Switch off ACS board", - options=default_opts, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.ACS_ASS.value, - info="ACS Assemblies", - op_code_entry=op_code_dict, + defs.add_service( + name=CustomServiceList.ACS_ASS.value, info="ACS Assemblies", op_code_entry=oce ) - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, + oce = OpCodeEntry() + oce.add( keys=SusOpCodes.SUS_ASS_NOM_SIDE, info="Switch SUS board to nominal side", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=SusOpCodes.SUS_ASS_RED_SIDE, info="Switch SUS board to redundant side", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=SusOpCodes.SUS_ASS_OFF, info="Switch off SUS board", - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=SusOpCodes.SUS_ASS_DUAL_MODE, info="Switch SUS board to dual mode", - options=default_opts, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.SUS_ASS.value, info="SUS Assembly", - op_code_entry=op_code_dict, + op_code_entry=oce, ) - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, + oce = OpCodeEntry() + oce.add( keys=tcs.OpCodes.TCS_BOARD_ASS_NORMAL, info=tcs.Info.TCS_BOARD_ASS_NORMAL, - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=tcs.OpCodes.TCS_BOARD_ASS_OFF, info=tcs.Info.TCS_BOARD_ASS_OFF, - options=default_opts, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.TCS_ASS.value, info="TCS Board Assembly", - op_code_entry=op_code_dict, + op_code_entry=oce, ) - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, + oce = OpCodeEntry() + oce.add( keys=controllers.OpCodes.THERMAL_CONTROLLER, info=controllers.Info.THERMAL_CONTROLLER, - options=default_opts, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=controllers.OpCodes.CORE_CONTROLLER, info=controllers.Info.CORE_CONTROLLER, - options=default_opts, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.CONTROLLERS.value, info="Controllers", - op_code_entry=op_code_dict, + op_code_entry=oce, ) diff --git a/pus_tc/devs/acu.py b/pus_tc/devs/acu.py index 55163cc..dae5d7f 100644 --- a/pus_tc/devs/acu.py +++ b/pus_tc/devs/acu.py @@ -6,6 +6,7 @@ import struct from config.definitions import CustomServiceList +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( @@ -19,7 +20,7 @@ from gomspace.gomspace_common import Info as GsInfo from config.object_ids import ACU_HANDLER_ID from pus_tc.devs.p60dock import P60DockConfigTable from tmtccmd.tc.pus_8_funccmd import generate_action_command -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class ACUConfigTable: @@ -47,43 +48,37 @@ class Info: TEST = "ACU Test" -def add_acu_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, +def add_acu_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add( keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.SET_PARAM, info=GsInfo.SET_PARAMETER, ) - add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.TEST, info=Info.TEST) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - op_code_entry=op_code_dict, + oce.add(keys=OpCodes.TEST, info=Info.TEST) + defs.add_service( name=CustomServiceList.ACU.value, info="ACU Device", + op_code_entry=oce, ) -def pack_acu_commands(object_id: ObjectId, q: QueueHelper, op_code: str) -> TcQueueT: +def pack_acu_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd("Handling ACU command") if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: q.add_log_cmd("ACU: Print channel stats") @@ -124,8 +119,6 @@ def pack_acu_commands(object_id: ObjectId, q: QueueHelper, op_code: str) -> TcQu ) pack_test_cmds(object_id=object_id, q=q) - return q - class ACUTestProcedure: """ @@ -150,7 +143,7 @@ class ACUTestProcedure: off = False -def pack_test_cmds(object_id: ObjectId, q: QueueHelper): +def pack_test_cmds(object_id: ObjectIdU32, q: QueueHelper): if ACUTestProcedure.all or ACUTestProcedure.reboot: q.add_log_cmd("ACU: Reboot") q.add_pus_tc(gs.pack_reboot_command(object_id)) diff --git a/pus_tc/devs/ccsds_handler.py b/pus_tc/devs/ccsds_handler.py index 8b21825..5983bae 100644 --- a/pus_tc/devs/ccsds_handler.py +++ b/pus_tc/devs/ccsds_handler.py @@ -9,7 +9,7 @@ import struct from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import QueueHelper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class CommandIds: @@ -32,7 +32,7 @@ class CommandIds: UPDATE_ON_FALLING_EDGE = 8 -def pack_ccsds_handler_test(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_ccsds_handler_test(object_id: ObjectIdU32, q: QueueHelper, op_code: str): obyt = object_id.as_bytes q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") if op_code == "0": diff --git a/pus_tc/devs/gps.py b/pus_tc/devs/gps.py index 1cc39ed..1ca454d 100644 --- a/pus_tc/devs/gps.py +++ b/pus_tc/devs/gps.py @@ -1,6 +1,7 @@ import enum from config.definitions import CustomServiceList +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.logging import get_console_logger @@ -23,19 +24,14 @@ class SetIds: HK = 0 -def add_gps_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - op_code_entry=op_code_dict, +def add_gps_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS) + oce.add(keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK) + defs.add_service( name=CustomServiceList.GPS_CTRL.value, info="GPS/GNSS Controller", + op_code_entry=oce, ) diff --git a/pus_tc/devs/heater.py b/pus_tc/devs/heater.py index 664cc13..e2317fb 100644 --- a/pus_tc/devs/heater.py +++ b/pus_tc/devs/heater.py @@ -7,8 +7,9 @@ import enum from config.definitions import CustomServiceList from config.object_ids import get_object_ids +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper -from tmtccmd.utility.obj_id import ObjectId +from tmtccmd.utility.obj_id import ObjectIdU32 from tmtccmd.tc.pus_201_fsfw_health import ( pack_set_health_cmd_data, FsfwHealth, @@ -52,31 +53,16 @@ class ActionIds(enum.IntEnum): SWITCH_HEATER = 0 -def add_heater_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.HEATER_CMD, info=Info.HEATER_CMD - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.HEATER_HEALTHY_CMD, - info=Info.HEATER_HEALTHY_CMD, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.HEATER_EXT_CTRL, - info=Info.HEATER_EXT_CTRL, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.HEATER_FAULTY_CMD, - info=Info.HEATER_FAULTY_CMD, - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, +def add_heater_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodes.HEATER_CMD, info=Info.HEATER_CMD) + oce.add(keys=OpCodes.HEATER_HEALTHY_CMD, info=Info.HEATER_HEALTHY_CMD) + oce.add(keys=OpCodes.HEATER_EXT_CTRL, info=Info.HEATER_EXT_CTRL) + oce.add(keys=OpCodes.HEATER_FAULTY_CMD, info=Info.HEATER_FAULTY_CMD) + defs.add_service( name=CustomServiceList.HEATER.value, info="Heater Device", - op_code_entry=op_code_dict, + op_code_entry=oce, ) @@ -133,7 +119,7 @@ def pack_heater_cmds(object_id: bytearray, op_code: str, q: QueueHelper): ) -def heater_idx_to_obj(heater: int) -> ObjectId: +def heater_idx_to_obj(heater: int) -> ObjectIdU32: from config.object_ids import ( HEATER_0_OBC_BRD, HEATER_1_PLOC_PROC_BRD, @@ -158,7 +144,7 @@ def heater_idx_to_obj(heater: int) -> ObjectId: obj_dict = get_object_ids() obj_id_obj = obj_dict.get(obj_id_array[heater]) if obj_id_obj is None: - return ObjectId.from_bytes(obj_id_array[heater]) + return ObjectIdU32.from_bytes(obj_id_array[heater]) return obj_id_obj @@ -187,7 +173,7 @@ def prompt_heater() -> int: def health_cmd( q: QueueHelper, heater_idx: int, - object_id: ObjectId, + object_id: ObjectIdU32, health: FsfwHealth, health_str: str, ): diff --git a/pus_tc/devs/imtq.py b/pus_tc/devs/imtq.py index 7105185..d8c0661 100644 --- a/pus_tc/devs/imtq.py +++ b/pus_tc/devs/imtq.py @@ -15,7 +15,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import ( generate_one_hk_command, ) from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class ImtqSetIds: @@ -44,7 +44,7 @@ class ImtqActionIds: read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D]) -def pack_imtq_test_into(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_imtq_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd( f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}" ) diff --git a/pus_tc/devs/p60dock.py b/pus_tc/devs/p60dock.py index 31e671a..869fc8c 100644 --- a/pus_tc/devs/p60dock.py +++ b/pus_tc/devs/p60dock.py @@ -5,15 +5,8 @@ @author J. Meier @date 13.12.2020 """ -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.packer import TcQueueT - from tmtccmd.tc import QueueHelper -from tmtccmd.tc.pus_3_fsfw_hk import ( - generate_one_hk_command, - make_sid, - generate_one_diag_command, -) +from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from gomspace.gomspace_common import * from config.object_ids import P60_DOCK_HANDLER @@ -89,7 +82,7 @@ class P60DockHkTable: wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) -def pack_p60dock_cmds(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_p60dock_cmds(object_id: ObjectIdU32, q: QueueHelper, op_code: str): objb = object_id.as_bytes if op_code in P60OpCodes.STACK_3V3_ON: q.add_log_cmd(Info.STACK_3V3_ON) diff --git a/pus_tc/devs/pcdu.py b/pus_tc/devs/pcdu.py index 02432c8..a726869 100644 --- a/pus_tc/devs/pcdu.py +++ b/pus_tc/devs/pcdu.py @@ -1,10 +1,5 @@ from config.definitions import CustomServiceList -from tmtccmd.config import ( - ServiceOpCodeDictT, - add_op_code_entry, - add_service_op_code_entry, - OpCodeDictKeys, -) +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info from pus_tc.devs.pdu1 import Pdu1OpCodes @@ -13,288 +8,137 @@ from pus_tc.devs.acu import add_acu_cmds from gomspace.gomspace_common import Info as GsInfo -def add_p60_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_ON, - info=Info.STACK_3V3_ON, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_3V3_OFF, - info=Info.STACK_3V3_OFF, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_ON, - info=Info.STACK_5V_ON, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=P60OpCodes.STACK_5V_OFF, - info=Info.STACK_5V_OFF, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, +def add_p60_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=P60OpCodes.STACK_3V3_ON, info=Info.STACK_3V3_ON) + oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=Info.STACK_3V3_OFF) + oce.add(keys=P60OpCodes.STACK_5V_ON, info=Info.STACK_5V_ON) + oce.add(keys=P60OpCodes.STACK_5V_OFF, info=Info.STACK_5V_OFF) + oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE) + oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE) + oce.add( keys=GomspaceOpCodes.PRINT_SWITCH_V_I, info="P60 Dock: Print Switches, Voltages, Currents", ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="P60 Dock: Print Latchups", - ) - add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests") - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.P60DOCK.value, - info="P60 Device", - op_code_entry=op_code_dict, + oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info="P60 Dock: Print Latchups") + oce.add(keys=P60OpCodes.TEST, info="P60 Tests") + defs.add_service( + name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce ) -def add_pdu1_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.TCS_BOARD_OFF.value, - info="PDU1: Turn TCS board off", +def add_pdu1_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=Pdu1OpCodes.TCS_BOARD_OFF.value, info="PDU1: Turn TCS board off") + oce.add(keys=Pdu1OpCodes.STAR_TRACKER_ON.value, info="PDU1: Turn star tracker on") + oce.add(keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, info="PDU1: Turn star tracker off") + oce.add(keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, info="PDU1: Turn SUS nominal on") + oce.add(keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, info="PDU1: Turn SUS nominal off") + oce.add(keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, info="PDU1: Turn ACS A side on") + oce.add(keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, info="PDU1: Turn ACS A side off") + oce.add(keys=Pdu1OpCodes.SYRLINKS_ON.value, info="PDU1: Turn Syrlinks on") + oce.add(keys=Pdu1OpCodes.SYRLINKS_OFF.value, info="PDU1: Turn Syrlinks off") + oce.add(keys=Pdu1OpCodes.MGT_ON.value, info="PDU1: Turn MGT on") + oce.add(keys=Pdu1OpCodes.MGT_OFF.value, info="PDU1: Turn MGT off") + oce.add(keys=Pdu1OpCodes.PLOC_ON.value, info="PDU1: Turn PLOC on") + oce.add(keys=Pdu1OpCodes.PLOC_OFF.value, info="PDU1: Turn PLOC off") + oce.add(keys=Pdu1OpCodes.SCEX_ON.value, info="PDU1: Turn Solar Cell Experiment on") + oce.add( + keys=Pdu1OpCodes.SCEX_OFF.value, info="PDU1: Turn Solar Cell Experiment off" ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.STAR_TRACKER_ON.value, - info="PDU1: Turn star tracker on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, - info="PDU1: Turn star tracker off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, - info="PDU1: Turn SUS nominal on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, - info="PDU1: Turn SUS nominal off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, - info="PDU1: Turn ACS A side on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, - info="PDU1: Turn ACS A side off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SYRLINKS_ON.value, - info="PDU1: Turn Syrlinks on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SYRLINKS_OFF.value, - info="PDU1: Turn Syrlinks off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.MGT_ON.value, - info="PDU1: Turn MGT on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.MGT_OFF.value, - info="PDU1: Turn MGT off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.PLOC_ON.value, - info="PDU1: Turn PLOC on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.PLOC_OFF.value, - info="PDU1: Turn PLOC off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SCEX_ON.value, - info="PDU1: Turn Solar Cell Experiment on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.SCEX_OFF.value, - info="PDU1: Turn Solar Cell Experiment off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, - info=GsInfo.REQUEST_CORE_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, - info=GsInfo.REQUEST_AUX_HK_ONCE, - ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE) + oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE) + oce.add( keys=GomspaceOpCodes.PRINT_SWITCH_V_I, info="PDU1: Print Switches, Voltages, Currents", ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu1OpCodes.TCS_BOARD_ON.value, - info="PDU1: Turn TCS board on", - ) - - add_op_code_entry( - op_code_dict=op_code_dict, - keys=GomspaceOpCodes.PRINT_LATCHUPS, - info="PDU1: Print Latchups", - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests" - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=GomspaceOpCodes.SET_PARAM, info="Set parameter" - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + oce.add(keys=Pdu1OpCodes.TCS_BOARD_ON.value, info="PDU1: Turn TCS board on") + oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info="PDU1: Print Latchups") + oce.add(keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests") + oce.add(keys=GomspaceOpCodes.SET_PARAM, info="Set parameter") + defs.add_service( name=CustomServiceList.PDU1.value, info="PDU1 Device", - op_code_entry=op_code_dict, + op_code_entry=oce, ) -def add_pdu2_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU2 Tests") - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, - info="PDU2: Turn ACS Side B on", +def add_pdu2_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys="0", info="PDU2 Tests") + oce.add(keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, info="PDU2: Turn ACS Side B on") + oce.add(keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, info="PDU2: Turn ACS Side B off") + oce.add(keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, info="PDU2: Turn SUS redundant on") + oce.add( + keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, info="PDU2: Turn SUS redundant off" ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, - info="PDU2: Turn ACS Side B off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, - info="PDU2: Turn SUS redundant on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, - info="PDU2: Turn SUS redundant off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_ON.value, - info="PDU2: Turn reaction wheels on", - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=Pdu2OpCodes.RW_OFF.value, - info="PDU2: Turn reaction wheels off", - ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add(keys=Pdu2OpCodes.RW_ON.value, info="PDU2: Turn reaction wheels on") + oce.add(keys=Pdu2OpCodes.RW_OFF.value, info="PDU2: Turn reaction wheels off") + oce.add( keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_ON.value, info="PDU2: PL PCDU Switch Channel Nominal (1) on", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_OFF.value, info="PDU2: PL PCDU Switch Channel Nominal (1) off", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_ON.value, info="PDU2: PL PCDU Switch Channel Redundant (1) on", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_OFF.value, info="PDU2: PL PCDU Switch Channel Redundant (1) off", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.TCS_HEATER_IN_ON.value, info="PDU2: Switch TCS Heater Input on", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.TCS_HEATER_IN_OFF.value, info="PDU2: Switch TCS Heater Input off", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_ON.value, info="PDU2: Switch Solar Array Deployment On", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_OFF.value, info="PDU2: Switch Solar Array Deployment Off", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.PL_CAMERA_ON.value, info="PDU2: Turn payload camera on", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=Pdu2OpCodes.PL_CAMERA_OFF.value, info="PDU2: Turn payload camera off", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.PRINT_SWITCH_V_I, info="PDU2: Print Switches, Voltages, Currents", - options={OpCodeDictKeys.TIMEOUT: 2.0}, ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=GomspaceOpCodes.PRINT_LATCHUPS, info="PDU2: Print Latchups", ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name="pdu2", info="PDU2 Device", - op_code_entry=op_code_dict, + op_code_entry=oce, ) -def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - add_p60_cmds(cmd_dict) - add_pdu1_cmds(cmd_dict) - add_pdu2_cmds(cmd_dict) - add_acu_cmds(cmd_dict) +def add_pcdu_cmds(defs: TmTcDefWrapper): + add_p60_cmds(defs) + add_pdu1_cmds(defs) + add_pdu2_cmds(defs) + add_acu_cmds(defs) diff --git a/pus_tc/devs/pdu1.py b/pus_tc/devs/pdu1.py index c80f1a6..2fc8ac6 100644 --- a/pus_tc/devs/pdu1.py +++ b/pus_tc/devs/pdu1.py @@ -55,7 +55,7 @@ class PDU1TestProcedure: turn_channel_3_off = False -def pack_pdu1_commands(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_pdu1_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd("Commanding PDU1") objb = object_id.as_bytes if op_code == Pdu1OpCodes.TCS_BOARD_ON.value: diff --git a/pus_tc/devs/pdu2.py b/pus_tc/devs/pdu2.py index 60f7268..1ce4842 100644 --- a/pus_tc/devs/pdu2.py +++ b/pus_tc/devs/pdu2.py @@ -65,7 +65,7 @@ class PDU2TestProcedure: request_hk_table = False -def pack_pdu2_commands(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_pdu2_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd("Testing PDU2") objb = object_id.as_bytes if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value: diff --git a/pus_tc/devs/ploc_memory_dumper.py b/pus_tc/devs/ploc_memory_dumper.py index 85f27fa..6b89119 100644 --- a/pus_tc/devs/ploc_memory_dumper.py +++ b/pus_tc/devs/ploc_memory_dumper.py @@ -10,14 +10,14 @@ import struct from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import QueueHelper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class ActionIds: DUMP_MRAM = 1 -def pack_ploc_memory_dumper_cmd(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_ploc_memory_dumper_cmd(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd( f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}" ) diff --git a/pus_tc/devs/ploc_mpsoc.py b/pus_tc/devs/ploc_mpsoc.py index 7e3497c..57e2175 100644 --- a/pus_tc/devs/ploc_mpsoc.py +++ b/pus_tc/devs/ploc_mpsoc.py @@ -9,12 +9,10 @@ import struct import enum -from tmtccmd.config.definitions import QueueCommands from tmtccmd.logging import get_console_logger -from tmtccmd.tc.packer import TcQueueT from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import QueueHelper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from utility.input_helper import InputHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes @@ -68,9 +66,7 @@ class PlocReplyIds(enum.IntEnum): TM_CAM_CMD_RPT = 19 -def pack_ploc_mpsoc_commands( - object_id: ObjectId, q: QueueHelper, op_code: str -) -> TcQueueT: +def pack_ploc_mpsoc_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd( f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" ) diff --git a/pus_tc/devs/ploc_supervisor.py b/pus_tc/devs/ploc_supervisor.py index 545385e..a608aea 100644 --- a/pus_tc/devs/ploc_supervisor.py +++ b/pus_tc/devs/ploc_supervisor.py @@ -12,7 +12,7 @@ from spacepackets.ecss.tc import PusTelecommand from tmtccmd.logging import get_console_logger from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from utility.input_helper import InputHelper LOGGER = get_console_logger() @@ -106,9 +106,7 @@ class SupvHkIds: BOOT_STATUS_REPORT = 53 -def pack_ploc_supv_commands( - object_id: ObjectId, q: QueueHelper, op_code: str -) -> TcQueueT: +def pack_ploc_supv_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") obyt = object_id.as_bytes if op_code == "0": diff --git a/pus_tc/devs/plpcdu.py b/pus_tc/devs/plpcdu.py index 77462b3..a5b47ea 100644 --- a/pus_tc/devs/plpcdu.py +++ b/pus_tc/devs/plpcdu.py @@ -4,17 +4,11 @@ import time from typing import Optional from config.definitions import CustomServiceList -from tmtccmd.config import ( - QueueCommands, - ServiceOpCodeDictT, - add_op_code_entry, - add_service_op_code_entry, -) -from tmtccmd.tc.definitions import TcQueueT +from tmtccmd.config import TmTcDefWrapper +from tmtccmd.config.tmtc import OpCodeEntry from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( - generate_one_hk_command, make_sid, generate_one_diag_command, ) @@ -120,83 +114,39 @@ class ParamIds(enum.IntEnum): INJECT_ALL_ON_FAILURE = 35 -def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_SSR, - info=Info.NORMAL_SSR, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_DRO, - info=Info.NORMAL_DRO, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_X8, - info=Info.NORMAL_X8, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_TX, - info=Info.NORMAL_TX, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_MPA, - info=Info.NORMAL_MPA, - ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.NORMAL_HPA, - info=Info.NORMAL_HPA, - ) - add_op_code_entry( - op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK - ) - add_op_code_entry( - op_code_dict=op_code_dict, +def add_pl_pcdu_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON) + oce.add(keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF) + oce.add(keys=OpCodes.NORMAL_SSR, info=Info.NORMAL_SSR) + oce.add(keys=OpCodes.NORMAL_DRO, info=Info.NORMAL_DRO) + oce.add(keys=OpCodes.NORMAL_X8, info=Info.NORMAL_X8) + oce.add(keys=OpCodes.NORMAL_TX, info=Info.NORMAL_TX) + oce.add(keys=OpCodes.NORMAL_MPA, info=Info.NORMAL_MPA) + oce.add(keys=OpCodes.NORMAL_HPA, info=Info.NORMAL_HPA) + oce.add(keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK) + oce.add( keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE, info="Inject failure SSR to DRO transition", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=OpCodes.INJECT_DRO_TO_X8_FAILURE, info="Inject failure in DRO to X8 transition", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=OpCodes.INJECT_X8_TO_TX_FAILURE, info="Inject failure in X8 to TX transition", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=OpCodes.INJECT_TX_TO_MPA_FAILURE, info="Inject failure in TX to MPA transition", ) - add_op_code_entry( - op_code_dict=op_code_dict, + oce.add( keys=OpCodes.INJECT_MPA_TO_HPA_FAILURE, info="Inject failure in MPA to HPA transition", ) - add_op_code_entry( - op_code_dict=op_code_dict, - keys=OpCodes.INJECT_ALL_ON_FAILURE, - info="Inject failure in all on mode", - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - name=CustomServiceList.PL_PCDU.value, - info="PL PCDU", - op_code_entry=op_code_dict, - ) + oce.add(keys=OpCodes.INJECT_ALL_ON_FAILURE, info="Inject failure in all on mode") + defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce) def pack_pl_pcdu_commands(q: QueueHelper, op_code: str): @@ -439,11 +389,9 @@ def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int: ) -def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): +def pack_wait_time_cmd(q: QueueHelper, param_id: int, print_str: str): wait_time = request_wait_time() - tc_queue.appendleft( - (QueueCommands.PRINT, f"Updating {print_str} wait time to {wait_time}") - ) + q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}") if wait_time is None: return param_data = pack_scalar_double_param_app_data( @@ -452,8 +400,7 @@ def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str): unique_id=param_id, parameter=wait_time, ) - cmd = pack_fsfw_load_param_cmd(app_data=param_data) - tc_queue.appendleft(cmd.pack_command_tuple()) + q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data)) def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str): diff --git a/pus_tc/devs/rad_sensor.py b/pus_tc/devs/rad_sensor.py index 3886721..898062a 100644 --- a/pus_tc/devs/rad_sensor.py +++ b/pus_tc/devs/rad_sensor.py @@ -11,9 +11,10 @@ from config.definitions import CustomServiceList from spacepackets.ecss.tc import PusTelecommand from pus_tc.service_200_mode import pack_mode_data, Modes +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class SetIds: @@ -45,30 +46,22 @@ class CommandIds: DISABLE_DEBUG_OUTPUT = 5 -def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT): - - op_code_dict = dict() - add_op_code_entry(op_code_dict=op_code_dict, info=Info.ON, keys=OpCodes.ON) - add_op_code_entry(op_code_dict=op_code_dict, info=Info.OFF, keys=OpCodes.OFF) - add_op_code_entry(op_code_dict=op_code_dict, info=Info.NORMAL, keys=OpCodes.NORMAL) - add_op_code_entry( - op_code_dict=op_code_dict, info=Info.REQ_OS_HK, keys=OpCodes.REQ_HK_ONCE - ) - add_op_code_entry( - op_code_dict=op_code_dict, info=Info.DEBUG_ON, keys=OpCodes.DEBUG_ON - ) - add_op_code_entry( - op_code_dict=op_code_dict, info=Info.DEBUG_OFF, keys=OpCodes.DEBUG_OFF - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, +def add_rad_sens_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(info=Info.ON, keys=OpCodes.ON) + oce.add(info=Info.OFF, keys=OpCodes.OFF) + oce.add(info=Info.NORMAL, keys=OpCodes.NORMAL) + oce.add(info=Info.REQ_OS_HK, keys=OpCodes.REQ_HK_ONCE) + oce.add(info=Info.DEBUG_ON, keys=OpCodes.DEBUG_ON) + oce.add(info=Info.DEBUG_OFF, keys=OpCodes.DEBUG_OFF) + defs.add_service( name=CustomServiceList.RAD_SENSOR.value, info="Radiation Sensor", - op_code_entry=op_code_dict, + op_code_entry=oce, ) -def pack_rad_sensor_test_into(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_rad_sensor_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}") if op_code in OpCodes.ON: @@ -94,7 +87,7 @@ def pack_rad_sensor_test_into(object_id: ObjectId, q: QueueHelper, op_code: str) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) -def rad_sensor_mode_cmd(object_id: ObjectId, mode: Modes, info: str, q: QueueHelper): +def rad_sensor_mode_cmd(object_id: ObjectIdU32, mode: Modes, info: str, q: QueueHelper): q.add_log_cmd(f"Rad sensor: {info}") mode_data = pack_mode_data(object_id.as_bytes, mode, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) diff --git a/pus_tc/devs/reaction_wheels.py b/pus_tc/devs/reaction_wheels.py index 591b69f..8153e49 100644 --- a/pus_tc/devs/reaction_wheels.py +++ b/pus_tc/devs/reaction_wheels.py @@ -6,6 +6,7 @@ """ import struct +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( generate_one_hk_command, @@ -74,57 +75,42 @@ class RampTime: MS_1000 = 1000 -def add_rw_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry( - op_code_dict=op_code_dict, info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED - ) - add_op_code_entry(op_code_dict=op_code_dict, info=InfoDevs.ON, keys=OpCodesDevs.ON) - add_op_code_entry( - op_code_dict=op_code_dict, info=InfoDevs.OFF, keys=OpCodesDevs.OFF - ) - add_op_code_entry( - op_code_dict=op_code_dict, info=InfoDevs.NML, keys=OpCodesDevs.NML - ) - add_op_code_entry( - op_code_dict=op_code_dict, info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS - ) - add_op_code_entry( - op_code_dict=op_code_dict, info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, +def add_rw_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED) + oce.add(info=InfoDevs.ON, keys=OpCodesDevs.ON) + oce.add(info=InfoDevs.OFF, keys=OpCodesDevs.OFF) + oce.add(info=InfoDevs.NML, keys=OpCodesDevs.NML) + oce.add(info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS) + oce.add(info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM) + defs.add_service( name=CustomServiceList.REACTION_WHEEL_1.value, - op_code_entry=op_code_dict, info="Reaction Wheel 1", + op_code_entry=oce, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.REACTION_WHEEL_2.value, - op_code_entry=op_code_dict, info="Reaction Wheel 2", + op_code_entry=oce, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.REACTION_WHEEL_3.value, - op_code_entry=op_code_dict, info="Reaction Wheel 3", + op_code_entry=oce, ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + defs.add_service( name=CustomServiceList.REACTION_WHEEL_4.value, - op_code_entry=op_code_dict, info="Reaction Wheel 4", + op_code_entry=oce, ) - op_code_dict = dict() - add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.ON, keys=OpCodesAss.ON) - add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.NML, keys=OpCodesAss.NML) - add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.OFF, keys=OpCodesAss.OFF) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + oce = OpCodeEntry() + oce.add(info=InfoAss.ON, keys=OpCodesAss.ON) + oce.add(info=InfoAss.NML, keys=OpCodesAss.NML) + oce.add(info=InfoAss.OFF, keys=OpCodesAss.OFF) + defs.add_service( name=CustomServiceList.RW_ASSEMBLY.value, - op_code_entry=op_code_dict, info="Reaction Wheel Assembly", + op_code_entry=oce, ) diff --git a/pus_tc/devs/rtd.py b/pus_tc/devs/rtd.py index 669c0d5..27e5862 100644 --- a/pus_tc/devs/rtd.py +++ b/pus_tc/devs/rtd.py @@ -3,9 +3,10 @@ from typing import Optional from config.definitions import CustomServiceList from pus_tc.devs.pdec_handler import CommandIds from spacepackets.ecss import PusTelecommand +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices import config.object_ids as oids from config.object_ids import get_object_ids @@ -44,20 +45,17 @@ class Info: WIRTE_CONFIG = "Write config" -def specify_rtd_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() - add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.ON, info=Info.ON) - add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.NORMAL, info=Info.NORMAL) - add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.OFF, info=Info.OFF) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, - op_code_entry=op_code_dict, - name=CustomServiceList.RTD.value, - info="RTD commands", +def specify_rtd_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodes.ON, info=Info.ON) + oce.add(keys=OpCodes.NORMAL, info=Info.NORMAL) + oce.add(keys=OpCodes.OFF, info=Info.OFF) + defs.add_service( + name=CustomServiceList.RTD.value, info="RTD commands", op_code_entry=oce ) -def pack_rtd_commands(op_code: str, object_id: Optional[ObjectId], q: QueueHelper): +def pack_rtd_commands(op_code: str, object_id: Optional[ObjectIdU32], q: QueueHelper): if object_id is not None and object_id not in RTD_IDS: print("Specified object ID not a valid RTD ID") object_id = None diff --git a/pus_tc/devs/star_tracker.py b/pus_tc/devs/star_tracker.py index 9345b68..382ac18 100644 --- a/pus_tc/devs/star_tracker.py +++ b/pus_tc/devs/star_tracker.py @@ -12,7 +12,7 @@ from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.logging import get_console_logger from tmtccmd.tc import QueueHelper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from utility.input_helper import InputHelper @@ -150,7 +150,7 @@ class Submode: FIRMWARE = 2 -def pack_star_tracker_commands(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_star_tracker_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd( f"Generate command for star tracker with object id: {object_id.as_hex_string}" ) diff --git a/pus_tc/devs/str_img_helper.py b/pus_tc/devs/str_img_helper.py index 266011b..b53c42b 100644 --- a/pus_tc/devs/str_img_helper.py +++ b/pus_tc/devs/str_img_helper.py @@ -12,7 +12,7 @@ import struct from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc import QueueHelper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class Commands: @@ -24,7 +24,7 @@ class ImagePathDefs: uploadFile = "/mnt/sd0/startracker/gemma.bin" -def pack_str_img_helper_command(object_id: ObjectId, q: QueueHelper, op_code: str): +def pack_str_img_helper_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd( f"Testing star tracker image helper object id: {object_id.as_hex_string}" ) diff --git a/pus_tc/devs/syrlinks_hk_handler.py b/pus_tc/devs/syrlinks_hk_handler.py index 6e919b6..287a620 100644 --- a/pus_tc/devs/syrlinks_hk_handler.py +++ b/pus_tc/devs/syrlinks_hk_handler.py @@ -5,17 +5,13 @@ @author J. Meier @date 13.12.2020 """ - -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.definitions import TcQueueT - from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from spacepackets.ecss.tc import PusTelecommand from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes import struct -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class SetIds: @@ -41,9 +37,7 @@ class CommandIds: DISABLE_DEBUG = 21 -def pack_syrlinks_command( - object_id: ObjectId, q: QueueHelper, op_code: str -) -> TcQueueT: +def pack_syrlinks_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str): obyt = object_id.as_bytes q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}") if op_code == "0": diff --git a/pus_tc/devs/tmp1075.py b/pus_tc/devs/tmp1075.py index ce095fc..59d7208 100644 --- a/pus_tc/devs/tmp1075.py +++ b/pus_tc/devs/tmp1075.py @@ -9,7 +9,7 @@ from spacepackets.ecss.tc import PusTelecommand from pus_tc.service_200_mode import pack_mode_data from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 class Tmp1075TestProcedure: @@ -33,7 +33,7 @@ class Tmp1075ActionIds: start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02]) -def pack_tmp1075_test_into(object_id: ObjectId, op_code: str, q: QueueHelper): +def pack_tmp1075_test_into(object_id: ObjectIdU32, op_code: str, q: QueueHelper): q.add_log_cmd( f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}" ) diff --git a/pus_tc/procedure_packer.py b/pus_tc/procedure_packer.py index 6c7c8e9..7e7d6e6 100644 --- a/pus_tc/procedure_packer.py +++ b/pus_tc/procedure_packer.py @@ -81,7 +81,7 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper): if service == CoreServiceList.SERVICE_5.value: return pack_generic_service_5_test_into(q=queue_helper) if service == CoreServiceList.SERVICE_17.value: - queue_helper.add_pus_tc(pack_service_17_ping_command()) + return queue_helper.add_pus_tc(pack_service_17_ping_command()) if service == CoreServiceList.SERVICE_200.value: return pack_service200_test_into(q=queue_helper) if service == CustomServiceList.P60DOCK.value: @@ -201,23 +201,3 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper): if service == CustomServiceList.RW_ASSEMBLY.value: return pack_rw_ass_cmds(q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code) LOGGER.warning(f"Invalid Service {service}") - - -def pre_tc_send_cb( - queue_entry: Union[bytes, QueueCommands], - com_if: CommunicationInterface, - queue_info: Union[PusTelecommand, any], - file_logger: logging.Logger, -): - if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray): - log_raw_pus_tc( - packet=queue_entry, - srv_subservice=(queue_info.service, queue_info.subservice), - ) - tc_info_string = f"Sent {queue_info}" - LOGGER.info(tc_info_string) - file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") - com_if.send(data=queue_entry) - elif isinstance(queue_entry, QueueCommands): - if queue_entry == QueueCommands.PRINT: - file_logger.info(queue_info) diff --git a/pus_tc/prompt_parameters.py b/pus_tc/prompt_parameters.py index feca256..b9dde69 100644 --- a/pus_tc/prompt_parameters.py +++ b/pus_tc/prompt_parameters.py @@ -10,9 +10,8 @@ from PyQt5.QtWidgets import ( from PyQt5 import QtCore - +from tmtccmd.config import CoreModeList from tmtccmd.core.globals_manager import get_global -from tmtccmd.config.definitions import CoreGlobalIds, CoreModeList class Parameter: @@ -93,32 +92,29 @@ class ParameterDialog(QDialog): """ -def prompt_parameters(parameterList): - gui = get_global(CoreGlobalIds.GUI) - mode = get_global(CoreGlobalIds.MODE) - - # gui only works in cont mode right now - if gui and mode == CoreModeList.CONTINUOUS_MODE: - return _gui_prompt(parameterList) - else: - return _cli_prompt(parameterList) +def prompt_parameters_gui(param_list) -> dict: + return _gui_prompt(param_list) -def _gui_prompt(parameterList): +def prompt_parameters_cli(param_list) -> dict: + return _cli_prompt(param_list) + + +def _gui_prompt(param_list) -> dict: dialog = ParameterDialog() - for parameter in parameterList: + for parameter in param_list: dialog.addParameter(parameter["name"], parameter["defaultValue"]) dialog.exec_() return dialog.getParameters() -def _cli_prompt(parameterList): +def _cli_prompt(param_list) -> dict: result = {} - for parameter in parameterList: - userInput = input( + for parameter in param_list: + user_input = input( "Specify {} [{}]: ".format(parameter["name"], parameter["defaultValue"]) ) - if userInput == "": - userInput = parameter["defaultValue"] - result[parameter["name"]] = userInput + if user_input == "": + user_input = parameter["defaultValue"] + result[parameter["name"]] = user_input return result diff --git a/pus_tc/system/controllers.py b/pus_tc/system/controllers.py index eaf3854..4078641 100644 --- a/pus_tc/system/controllers.py +++ b/pus_tc/system/controllers.py @@ -1,11 +1,11 @@ from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import Modes -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from .common import command_mode import config.object_ids as obj_ids -from pus_tc.prompt_parameters import prompt_parameters +from pus_tc.prompt_parameters import prompt_parameters_cli, prompt_parameters_gui class OpCodes: @@ -18,13 +18,15 @@ class Info: CORE_CONTROLLER = "ACS controller" -def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectId): - parameters = prompt_parameters( - [ - {"name": "Mode", "defaultValue": "2"}, - {"name": "Submode", "defaultValue": "0"}, - ] - ) +def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectIdU32, gui: bool): + param_list = [ + {"name": "Mode", "defaultValue": "2"}, + {"name": "Submode", "defaultValue": "0"}, + ] + if gui: + parameters = prompt_parameters_gui(param_list) + else: + parameters = prompt_parameters_cli(param_list) mode = int(parameters["Mode"]) if mode < 0 or mode > 2: print("Invalid Mode, defaulting to OFF") @@ -39,7 +41,7 @@ def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectId): ) -def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectId): +def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectIdU32): command_mode( object_id=object_id.as_bytes, mode=Modes.OFF, @@ -49,7 +51,7 @@ def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectId): ) -def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectId): +def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectIdU32): command_mode( object_id=object_id.as_bytes, mode=Modes.ON, @@ -59,7 +61,7 @@ def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectId): ) -def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectId): +def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectIdU32): command_mode( object_id=object_id.as_bytes, mode=Modes.NORMAL, diff --git a/pus_tc/system/core.py b/pus_tc/system/core.py index 3531534..b23f67e 100644 --- a/pus_tc/system/core.py +++ b/pus_tc/system/core.py @@ -7,8 +7,10 @@ from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.logging import get_console_logger from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command +from tmtccmd.config.tmtc import OpCodeEntry from config.object_ids import CORE_CONTROLLER_ID + LOGGER = get_console_logger() @@ -66,60 +68,48 @@ class Copy(enum.IntEnum): def add_core_controller_definitions(defs: TmTcDefWrapper): - od = dict() - add_op_code_entry(op_code_dict=od, keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC) - add_op_code_entry(op_code_dict=od, keys=OpCodes.REBOOT_FULL, info=Info.REBOOT_FULL) - add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_SELF, info="Reboot Self") - add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_0_0, info="Reboot 0 0") - add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_0_1, info="Reboot 0 1") - add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_1_0, info="Reboot 1 0") - add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_1_1, info="Reboot 1 1") - add_op_code_entry( - op_code_dict=od, + oce = OpCodeEntry() + oce.add(keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC) + oce.add(keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC) + oce.add(keys=OpCodes.REBOOT_FULL, info=Info.REBOOT_FULL) + oce.add(keys=OpCodes.XSC_REBOOT_SELF, info="Reboot Self") + oce.add(keys=OpCodes.XSC_REBOOT_0_0, info="Reboot 0 0") + oce.add(keys=OpCodes.XSC_REBOOT_0_1, info="Reboot 0 1") + oce.add(keys=OpCodes.XSC_REBOOT_1_0, info="Reboot 1 0") + oce.add(keys=OpCodes.XSC_REBOOT_1_1, info="Reboot 1 1") + oce.add( keys=OpCodes.GET_HK, info="Request housekeeping set", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.ENABLE_REBOOT_FILE_HANDLING, info="Enable reboot file handling", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.DISABLE_REBOOT_FILE_HANDLING, info="Disable reboot file handling", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.RESET_ALL_REBOOT_COUNTERS, info="Reset all reboot counters", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.RESET_REBOOT_COUNTER_00, info="Reset reboot counter 0 0", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.RESET_REBOOT_COUNTER_01, info="Reset reboot counter 0 1", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.RESET_REBOOT_COUNTER_10, info="Reset reboot counter 1 0", ) - add_op_code_entry( - op_code_dict=od, + oce.add( keys=OpCodes.RESET_REBOOT_COUNTER_11, info="Reset reboot counter 1 1", ) - add_service_op_code_entry( - srv_op_code_dict=defs, - name=CustomServiceList.CORE.value, - info="Core Controller", - op_code_entry=od, - ) + defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) def pack_core_commands(q: QueueHelper, op_code: str): diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py index 84e0209..782a12e 100644 --- a/pus_tc/system/proc.py +++ b/pus_tc/system/proc.py @@ -7,6 +7,7 @@ from typing import List from config.definitions import CustomServiceList from config.object_ids import get_object_ids from pus_tc.system.tcs import pack_tcs_sys_commands +from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_11_tc_sched import ( @@ -125,17 +126,14 @@ def generic_print(q: QueueHelper, info: dict): q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})") -def add_proc_cmds(cmd_dict: ServiceOpCodeDictT): - op_code_dict = dict() +def add_proc_cmds(defs: TmTcDefWrapper): + oce = OpCodeEntry() for proc_entry in PROC_INFO_DICT.values(): - add_op_code_entry( - op_code_dict=op_code_dict, keys=proc_entry[0], info=proc_entry[1] - ) - add_service_op_code_entry( - srv_op_code_dict=cmd_dict, + oce.add(keys=proc_entry[0], info=proc_entry[1]) + defs.add_service( name=CustomServiceList.PROCEDURE.value, info="TV Test Procedures", - op_code_entry=op_code_dict, + op_code_entry=oce, ) diff --git a/pus_tm/action_reply_handler.py b/pus_tm/action_reply_handler.py index 9c2392c..b56cc3e 100644 --- a/pus_tm/action_reply_handler.py +++ b/pus_tm/action_reply_handler.py @@ -6,7 +6,6 @@ from pus_tc.devs.ploc_supervisor import SupvActionIds from pus_tc.devs.star_tracker import StarTrackerActionIds from gomspace.gomspace_common import GomspaceDeviceActionIds from tmtccmd.logging import get_console_logger -from tmtccmd.config.definitions import DataReplyUnpacked from tmtccmd.tm import Service8FsfwTm from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter @@ -91,7 +90,6 @@ def handle_ploc_replies( def handle_supervisor_replies( action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray ): - reply = DataReplyUnpacked() if action_id == SupvActionIds.DUMP_MRAM: header_list = ["MRAM Dump"] content_list = [custom_data[: len(custom_data)]] diff --git a/pus_tm/devs/gyros.py b/pus_tm/devs/gyros.py index 8fe6c79..48af1b1 100644 --- a/pus_tm/devs/gyros.py +++ b/pus_tm/devs/gyros.py @@ -1,7 +1,7 @@ import struct from pus_tm.defs import PrintWrapper -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from pus_tc.devs.gyros import L3gGyroSetIds, AdisGyroSetIds @@ -9,7 +9,7 @@ import config.object_ids as obj_ids def handle_gyros_hk_data( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if object_id.as_bytes in [ obj_ids.GYRO_0_ADIS_HANDLER_ID, @@ -28,7 +28,7 @@ def handle_gyros_hk_data( def handle_adis_gyro_hk( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if set_id == AdisGyroSetIds.CORE_HK: pw = PrintWrapper(printer) @@ -58,7 +58,7 @@ def handle_adis_gyro_hk( def handle_l3g_gyro_hk( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if set_id == L3gGyroSetIds.CORE_HK: pw = PrintWrapper(printer) diff --git a/pus_tm/devs/mgms.py b/pus_tm/devs/mgms.py index 48360ea..2e0380e 100644 --- a/pus_tm/devs/mgms.py +++ b/pus_tm/devs/mgms.py @@ -2,13 +2,13 @@ import struct from pus_tm.defs import PrintWrapper from pus_tc.devs.mgms import MgmRm3100SetIds, MgmLis3SetIds -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter import config.object_ids as obj_ids def handle_mgm_hk_data( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if object_id.as_bytes in [ obj_ids.MGM_0_LIS3_HANDLER_ID, @@ -24,7 +24,7 @@ def handle_mgm_hk_data( def handle_mgm_lis3_hk_data( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if set_id == MgmLis3SetIds.CORE_HK: pw = PrintWrapper(printer) @@ -41,7 +41,7 @@ def handle_mgm_lis3_hk_data( def handle_mgm_rm3100_hk_data( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if set_id == MgmRm3100SetIds.CORE_HK: pw = PrintWrapper(printer) diff --git a/pus_tm/devs/reaction_wheels.py b/pus_tm/devs/reaction_wheels.py index e1e1c28..2597357 100644 --- a/pus_tm/devs/reaction_wheels.py +++ b/pus_tm/devs/reaction_wheels.py @@ -1,11 +1,11 @@ import struct from pus_tm.defs import PrintWrapper, FsfwTmTcPrinter -from tmtccmd.utility.obj_id import ObjectId +from tmtccmd.utility.obj_id import ObjectIdU32 def handle_rw_hk_data( - printer: FsfwTmTcPrinter, object_id: ObjectId, set_id: int, hk_data: bytes + printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes ): from pus_tc.devs.reaction_wheels import RwSetIds diff --git a/pus_tm/devs/sus.py b/pus_tm/devs/sus.py index 0067998..c3a323a 100644 --- a/pus_tm/devs/sus.py +++ b/pus_tm/devs/sus.py @@ -2,12 +2,12 @@ import struct from pus_tm.defs import PrintWrapper from pus_tc.devs.sus import SetIds -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter def handle_sus_hk( - object_id: ObjectId, hk_data: bytes, printer: FsfwTmTcPrinter, set_id: int + object_id: ObjectIdU32, hk_data: bytes, printer: FsfwTmTcPrinter, set_id: int ): pw = PrintWrapper(printer) pw.dlog(f"Received SUS HK data from {object_id}") diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index 2aefb26..904475b 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -9,7 +9,7 @@ from tmtccmd.tm.pus_3_fsfw_hk import ( HkContentType, Service3FsfwTm, ) -from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT +from tmtccmd.utility.obj_id import ObjectIdU32, ObjectIdDictT from tmtccmd.logging import get_console_logger from pus_tm.devs.bpx_bat import handle_bpx_hk_data @@ -72,14 +72,14 @@ def handle_hk_packet( def handle_regular_hk_print( printer: FsfwTmTcPrinter, - object_id: ObjectId, + object_id: ObjectIdU32, hk_packet: Service3Base, hk_data: bytes, ): objb = object_id.as_bytes set_id = hk_packet.set_id """This function is called when a Service 3 Housekeeping packet is received.""" - if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: + if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: handle_rw_hk_data(printer, object_id, set_id, hk_data) elif objb == obj_ids.SYRLINKS_HANDLER_ID: handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id) diff --git a/pus_tm/system/tcs.py b/pus_tm/system/tcs.py index f6e9ec0..50ccd2c 100644 --- a/pus_tm/system/tcs.py +++ b/pus_tm/system/tcs.py @@ -3,7 +3,7 @@ import struct from pus_tm.defs import PrintWrapper from pus_tm.tcp_server_objects import * -from tmtccmd.utility import ObjectId +from tmtccmd.utility import ObjectIdU32 from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter @@ -14,7 +14,7 @@ class SetIds(enum.IntEnum): def handle_thermal_controller_hk_data( - object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes + object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes ): if set_id == SetIds.SENSOR_TEMPERATURE_SET: pw = PrintWrapper(printer) @@ -42,8 +42,7 @@ def handle_thermal_controller_hk_data( "TMP1075 1": tm_data[16], "TMP1075 2": tm_data[17], } - - # print(parsed_data) + printer.file_logger.info(str(parsed_data)) tcp_server_sensor_temperatures.report_parsed_hk_data( object_id, set_id, parsed_data ) diff --git a/pus_tm/tm_tcp_server.py b/pus_tm/tm_tcp_server.py index 2c4031a..4d3f243 100644 --- a/pus_tm/tm_tcp_server.py +++ b/pus_tm/tm_tcp_server.py @@ -4,7 +4,7 @@ import json import base64 from tmtccmd.logging import get_console_logger -from tmtccmd.utility.obj_id import ObjectId +from tmtccmd.utility.obj_id import ObjectIdU32 from dle_encoder import DleEncoder LOGGER = get_console_logger() @@ -64,7 +64,7 @@ class TmTcpServer: self.client_connection.close() self.client_connection = None - def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes): + def report_raw_hk_data(self, object_id: ObjectIdU32, set_id: int, hk_data: bytes): data_dict = { "type": "TM", @@ -76,7 +76,9 @@ class TmTcpServer: self._send_dictionary_over_socket(data_dict) - def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary): + def report_parsed_hk_data( + self, object_id: ObjectIdU32, set_id: int, data_dictionary + ): data_dict = { "type": "TM", "tmType": "Parsed HK", diff --git a/tmtcc.py b/tmtcc.py index 4d53773..cafd65d 100644 --- a/tmtcc.py +++ b/tmtcc.py @@ -53,7 +53,7 @@ from tmtccmd.config.args import ( ) from config import __version__ from config.definitions import PUS_APID -from config.hook_implementations import EiveHookObject +from config.hook import EiveHookObject from pus_tm.factory_hook import pus_factory_hook from pus_tc.procedure_packer import handle_default_procedure