From 02d9d6adfc093fe10dc144fb53084e843c516835 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 22 Nov 2023 11:21:26 +0100 Subject: [PATCH] OOF2 --- eive_tmtc/config/hook.py | 25 ++- eive_tmtc/pus_tc/procedure_packer.py | 3 +- eive_tmtc/tmtc/acs/acs_ctrl.py | 2 +- eive_tmtc/tmtc/payload/ploc_mpsoc.py | 83 ++++----- eive_tmtc/tmtc/payload/ploc_supervisor.py | 203 +++++++++------------- eive_tmtc/tmtc/payload/scex.py | 53 +++--- tmtcc.py | 4 + 7 files changed, 168 insertions(+), 205 deletions(-) diff --git a/eive_tmtc/config/hook.py b/eive_tmtc/config/hook.py index f088ded..444d032 100644 --- a/eive_tmtc/config/hook.py +++ b/eive_tmtc/config/hook.py @@ -1,14 +1,16 @@ from typing import Optional -from tmtccmd import CcsdsTmtcBackend, HookBase +from eive_tmtc.config.definitions import SPACE_PACKET_IDS +from eive_tmtc.tmtc.payload.ploc_supervisor import create_ploc_supv_node +from eive_tmtc.tmtc.payload.scex import create_scex_node +from eive_tmtc.tmtc.time import create_time_node +from eive_tmtc.tmtc.payload.ploc_mpsoc import create_ploc_mpsoc_node +from tmtccmd import HookBase, CcsdsTmtcBackend from tmtccmd.com import ComInterface from tmtccmd.config import CmdTreeNode -from tmtccmd.util import ObjectIdDictT, RetvalDictT -from eive_tmtc.config.definitions import SPACE_PACKET_IDS from eive_tmtc.config.retvals import get_retval_dict -from eive_tmtc.tmtc.time import create_time_node -from eive_tmtc.tmtc.acs.acs_ctrl import create_acs_ctrl_node +from tmtccmd.util import ObjectIdDictT, RetvalDictT class EiveHookObject(HookBase): @@ -41,6 +43,10 @@ class EiveHookObject(HookBase): gyro_devs = CmdTreeNode("gyro_devs", "Gyro Devices") acs_brd_assy_node.add_child(mgm_devs) acs_brd_assy_node.add_child(gyro_devs) + acs_ctrl = CmdTreeNode("acs_ctrl", "ACS Controller") + acs_ctrl.add_child(mode_node) + acs_ctrl.add_child(action_node) + acs_ctrl.add_child(param_node) rws = CmdTreeNode("rws", "Reaction Wheel Devices") rw_assy = CmdTreeNode("rw_assy", "Reaction Wheel Assembly") rw_1 = CmdTreeNode("rw_1", "Reaction Wheel 1") @@ -63,7 +69,7 @@ class EiveHookObject(HookBase): gnss_devs = CmdTreeNode("gnss", "GNSS Devices") acs_node.add_child(acs_brd_assy_node) - acs_node.add_child(create_acs_ctrl_node()) + acs_node.add_child(acs_ctrl) acs_node.add_child(mode_node) acs_node.add_child(rws) acs_node.add_child(mgt) @@ -104,10 +110,11 @@ class EiveHookObject(HookBase): eps_node.add_child(bat_node) payload_node = CmdTreeNode("payload", "Payload Subsystem") - scex_node = CmdTreeNode("scex", "SCEX devices") - scex_node.add_child(dev_node) pl_pcdu = CmdTreeNode("pl_pcdu", "Payload PCDU") payload_node.add_child(pl_pcdu) + payload_node.add_child(create_scex_node()) + payload_node.add_child(create_ploc_mpsoc_node()) + payload_node.add_child(create_ploc_supv_node()) obdh_node = CmdTreeNode("obdh", "OBDH Subsystem") xiphos_wdt = CmdTreeNode("wdt", "Xiphos WDT") @@ -127,8 +134,8 @@ class EiveHookObject(HookBase): def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: from tmtccmd.config.com import ( - create_com_interface_cfg_default, create_com_interface_default, + create_com_interface_cfg_default, ) assert self.cfg_path is not None diff --git a/eive_tmtc/pus_tc/procedure_packer.py b/eive_tmtc/pus_tc/procedure_packer.py index 308c1bb..52e3daa 100644 --- a/eive_tmtc/pus_tc/procedure_packer.py +++ b/eive_tmtc/pus_tc/procedure_packer.py @@ -13,7 +13,7 @@ from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands from eive_tmtc.tmtc.payload.scex import pack_scex_cmds from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands from eive_tmtc.tmtc.time import pack_time_management_cmd -from tmtccmd import DefaultProcedureInfo, TcHandlerBase +from tmtccmd import DefaultProcedureInfo from tmtccmd.tmtc import DefaultPusQueueHelper from eive_tmtc.tmtc.power.p60dock import pack_p60dock_cmds @@ -71,7 +71,6 @@ from eive_tmtc.utility.input_helper import InputHelper def handle_default_procedure( # noqa C901: Complexity okay here. - tc_base: TcHandlerBase, info: DefaultProcedureInfo, queue_helper: DefaultPusQueueHelper, ): diff --git a/eive_tmtc/tmtc/acs/acs_ctrl.py b/eive_tmtc/tmtc/acs/acs_ctrl.py index 06e635f..def521b 100644 --- a/eive_tmtc/tmtc/acs/acs_ctrl.py +++ b/eive_tmtc/tmtc/acs/acs_ctrl.py @@ -5,7 +5,7 @@ import math import socket import struct from socket import AF_INET -from typing import Dict, Tuple +from typing import Tuple from tmtccmd.config.tmtc import ( CmdTreeNode, diff --git a/eive_tmtc/tmtc/payload/ploc_mpsoc.py b/eive_tmtc/tmtc/payload/ploc_mpsoc.py index 9a8e0a5..9953b9f 100644 --- a/eive_tmtc/tmtc/payload/ploc_mpsoc.py +++ b/eive_tmtc/tmtc/payload/ploc_mpsoc.py @@ -15,13 +15,13 @@ from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import get_object_ids, PLOC_MPSOC_ID from eive_tmtc.pus_tm.defs import PrintWrapper from tmtccmd.config.tmtc import ( + CmdTreeNode, tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper, ) from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.tmtc import DefaultPusQueueHelper from eive_tmtc.utility.input_helper import InputHelper from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s8_fsfw_action import create_action_cmd @@ -128,6 +128,8 @@ class Info: REPLAY_WRITE_SEQ = "Replay write sequence" DOWNLINK_PWR_ON = "Downlink Power On" DOWNLINK_PWR_OFF = "Downlink Power Off" + MEM_WRITE = "Write to Memory" + MEM_READ = "Read from Memory" REPLAY_START = "Replay Start" CAM_TAKE_PIC = "Cam Take Picture" SIMPLEX_SEND_FILE = "Simplex Send File" @@ -143,6 +145,18 @@ class MemAddresses(enum.IntEnum): DEADBEEF = 0x40000004 +def create_ploc_mpsoc_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + ploc_mpsoc = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC") + for op_code, info in combined_dict.items(): + ploc_mpsoc.add_child(CmdTreeNode(op_code, info)) + return ploc_mpsoc + + @tmtc_definitions_provider def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() @@ -174,31 +188,29 @@ def add_ploc_mpsoc_cmds(defs: TmtcDefinitionWrapper): defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce) -@service_provider(CustomServiceList.PLOC_MPSOC) -def pack_ploc_mpsoc_commands( # noqa C901 - p: ServiceProviderParams, +def pack_ploc_mpsoc_commands( + q: DefaultPusQueueHelper, cmd_str: str ): # noqa C901: Complexity okay here. object_id = get_object_ids().get(PLOC_MPSOC_ID) - q = p.queue_helper + assert object_id is not None prefix = "PLOC MPSoC" - op_code = p.op_code q.add_log_cmd( f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" ) obyt = object_id.as_bytes - if op_code == OpCode.OFF: + if cmd_str == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(obyt, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code == OpCode.ON: + if cmd_str == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") data = pack_mode_data(obyt, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) - if op_code == OpCode.NORMAL: + if cmd_str == OpCode.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NORMAL}") data = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) - if op_code == OpCode.MEM_WRITE: + if cmd_str == OpCode.MEM_WRITE: q.add_log_cmd("PLOC MPSoC: TC mem write test") memory_address = int( input("PLOC MPSoC: Tc Mem Write: Type memory address: 0x"), 16 @@ -210,39 +222,35 @@ def pack_ploc_mpsoc_commands( # noqa C901 object_id.as_bytes, memory_address, memory_data, mem_len ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "4": - q.add_log_cmd("PLOC MPSoC: TC mem read test") - data = prepare_mem_read_command(object_id=object_id.as_bytes) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.FLASH_WRITE_FILE: + if cmd_str == OpCode.FLASH_WRITE_FILE: q.add_log_cmd(f"{prefix}: {Info.FLASH_WRITE_FILE}") data = prepare_flash_write_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.FLASH_READ_FILE: + if cmd_str == OpCode.FLASH_READ_FILE: q.add_log_cmd(f"{prefix}: {Info.FLASH_READ_FILE}") data = prepare_flash_read_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.FLASH_DELETE_FILE: + if cmd_str == OpCode.FLASH_DELETE_FILE: q.add_log_cmd("PLOC MPSoC: Flash delete") data = prepare_flash_delete_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code in OpCode.REPLAY_START: + if cmd_str in OpCode.REPLAY_START: q.add_log_cmd(f"{prefix}: {Info.REPLAY_START}") data = prepare_replay_start_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.REPLAY_STOP: + if cmd_str == OpCode.REPLAY_STOP: q.add_log_cmd("PLOC MPSoC: Replay stop") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_REPLAY_STOP) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.DOWNLINK_PWR_ON: + if cmd_str == OpCode.DOWNLINK_PWR_ON: q.add_log_cmd(f"{prefix}: {OpCode.DOWNLINK_PWR_ON}") data = prepare_downlink_pwr_on_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.DOWNLINK_PWR_OFF: + if cmd_str == OpCode.DOWNLINK_PWR_OFF: q.add_log_cmd("PLOC MPSoC: Downlink pwr off") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_DOWNLINK_PWR_OFF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.FLASH_GET_DIR_CONTENT: + if cmd_str == OpCode.FLASH_GET_DIR_CONTENT: q.add_log_cmd(f"{prefix}: {Info.FLASH_GET_DIR_CONTENT}") dir_name = input("Please specify MPSoC directory name to get information for: ") dir_name = bytearray(dir_name.encode("utf-8")) @@ -254,15 +262,15 @@ def pack_ploc_mpsoc_commands( # noqa C901 user_data=dir_name, ) ) - if op_code == OpCode.REPLAY_WRITE_SEQ: + if cmd_str == OpCode.REPLAY_WRITE_SEQ: q.add_log_cmd(f"{prefix}: {Info.REPLAY_WRITE_SEQ}") data = prepare_replay_write_sequence_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "12": + if cmd_str == "12": q.add_log_cmd("PLOC MPSoC: Reset OBSW sequence count") data = object_id.as_bytes + struct.pack("!I", ActionId.OBSW_RESET_SEQ_COUNT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.VERIFY_BOOT: + if cmd_str == OpCode.VERIFY_BOOT: num_words = 1 q.add_log_cmd(f"{prefix} {Info.VERIFY_BOOT}") data = ( @@ -272,15 +280,15 @@ def pack_ploc_mpsoc_commands( # noqa C901 + struct.pack("!H", num_words) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.MODE_REPLAY: + if cmd_str == OpCode.MODE_REPLAY: q.add_log_cmd("PLOC MPSoC: Tc mode replay") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_REPLAY) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.MODE_IDLE: + if cmd_str == OpCode.MODE_IDLE: q.add_log_cmd("PLOC MPSoC: Tc mode idle") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_IDLE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "16": + if cmd_str == "16": q.add_log_cmd("PLOC MPSoC: Tc cam command send") cam_cmd = input("Specify cam command string: ") data = ( @@ -289,27 +297,27 @@ def pack_ploc_mpsoc_commands( # noqa C901 + bytearray(cam_cmd, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "17": + if cmd_str == "17": q.add_log_cmd("PLOC MPSoC: Set UART TX tristate") data = object_id.as_bytes + struct.pack("!I", ActionId.SET_UART_TX_TRISTATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == "18": + if cmd_str == "18": q.add_log_cmd("PLOC MPSoC: Release UART TX") data = object_id.as_bytes + struct.pack("!I", ActionId.RELEASE_UART_TX) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.CAM_TAKE_PIC: + if cmd_str == OpCode.CAM_TAKE_PIC: q.add_log_cmd("PLOC MPSoC: Cam take picture") data = prepare_cam_take_pic_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.SIMPLEX_SEND_FILE: + if cmd_str == OpCode.SIMPLEX_SEND_FILE: q.add_log_cmd("PLOC MPSoC: Simplex send file") data = prepare_simplex_send_file_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.DOWNLINK_DATA_MODULATE: + if cmd_str == OpCode.DOWNLINK_DATA_MODULATE: q.add_log_cmd("PLOC MPSoC: Downlink data modulate") data = prepare_downlink_data_modulate_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) - if op_code == OpCode.MODE_SNAPSHOT: + if cmd_str == OpCode.MODE_SNAPSHOT: q.add_log_cmd("PLOC MPSoC: Mode snapshot") data = object_id.as_bytes + struct.pack("!I", ActionId.TC_MODE_SNAPSHOT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) @@ -671,10 +679,7 @@ def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytea current_idx = 0 dir_name_short = custom_data[current_idx : current_idx + 12].decode("utf-8") current_idx += 12 - num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[ - 0 - ] - current_idx += 4 + num_elements = struct.unpack("!I", custom_data[current_idx : current_idx + 4])[0] elem_names = [] elem_attrs = [] elem_sizes = [] diff --git a/eive_tmtc/tmtc/payload/ploc_supervisor.py b/eive_tmtc/tmtc/payload/ploc_supervisor.py index a48bc43..3b39969 100644 --- a/eive_tmtc/tmtc/payload/ploc_supervisor.py +++ b/eive_tmtc/tmtc/payload/ploc_supervisor.py @@ -11,14 +11,11 @@ import logging import struct from eive_tmtc.config.object_ids import PLOC_SUPV_ID, get_object_ids -from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid -from tmtccmd.config import TmtcDefinitionWrapper -from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry -from tmtccmd.tmtc import service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.config.tmtc import CmdTreeNode +from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter @@ -26,7 +23,7 @@ from eive_tmtc.utility.input_helper import InputHelper _LOGGER = logging.getLogger(__name__) -latchup_id_dict = { +LATCHUP_ID_DICT = { "0": "0.85V", "1": "1.8V", "2": "MISC", @@ -135,22 +132,23 @@ class SetIds(enum.IntEnum): BOOT_STATUS_REPORT = 103 -class OpCodes: - OFF = ["0", "off"] - ON = ["1", "on"] - NORMAL = ["2", "nml"] - HK_TO_OBC = ["3", "hk_to_obc"] - REQUEST_HK = ["4", "req_hk"] - START_MPSOC = ["5", "start_mpsoc"] - SHUTDOWN_MPSOC = ["6", "stop_mpsoc"] - SEL_NVM = ["7", "sel_nvm"] - SET_TIME_REF = ["set_time_ref"] - FACTORY_FLASH = ["factory_flash"] - REQ_BOOT_STATUS_REPORT = ["13", "boot_report"] - START_UPDATE = ["42", "start_update"] - PERFORM_UPDATE = ["update"] - FACTORY_RESET = ["factory_reset"] - MEM_CHECK = ["mem_check"] +class OpCode(str, enum.Enum): + value: str + OFF = "off" + ON = "on" + NML = "nml" + HK_TO_OBC = "hk_to_obc" + REQUEST_HK = "req_hk" + START_MPSOC = "start_mpsoc" + SHUTDOWN_MPSOC = "stop_mpsoc" + SEL_NVM = "sel_nvm" + SET_TIME_REF = "set_time_ref" + FACTORY_FLASH = "factory_flash" + REQ_BOOT_STATUS_REPORT = "boot_report" + START_UPDATE = "start_update" + PERFORM_UPDATE = "update" + FACTORY_RESET = "factory_reset" + MEM_CHECK = "mem_check" RESET_MPSOC = "reset_mpsoc" @@ -161,6 +159,8 @@ class Info(str, enum.Enum): NML = "Switch Normal" HK_TO_OBC = "Request HK from PLOC SUPV" REQUEST_HK = "Request HK set from PLOC Handler" + START_MPSOC = "Start MPSoC" + SHUTDOWN_MPSOC = "Shutdown MPSoC" SET_TIME_REF = "Set time reference" FACTORY_FLASH = "Factory Flash Mode" PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes" @@ -172,95 +172,54 @@ class Info(str, enum.Enum): RESET_MPSOC = "Reset MPSoC" -@tmtc_definitions_provider -def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(OpCodes.OFF, Info.OFF) - oce.add(OpCodes.ON, Info.ON) - oce.add(OpCodes.NORMAL, Info.NML) - oce.add(OpCodes.HK_TO_OBC, Info.HK_TO_OBC) - oce.add(OpCodes.REQUEST_HK, Info.REQUEST_HK) - oce.add(OpCodes.START_MPSOC, "PLOC Supervisor: Start MPSoC") - oce.add(OpCodes.SHUTDOWN_MPSOC, "PLOC Supervisor: Shutdown MPSoC") - oce.add(OpCodes.SEL_NVM, Info.SEL_NVM) - oce.add(OpCodes.SET_TIME_REF, Info.SET_TIME_REF) - oce.add(OpCodes.FACTORY_RESET, Info.FACTORY_RESET) - oce.add(OpCodes.RESET_MPSOC, Info.RESET_MPSOC) - oce.add("8", "PLOC Supervisor: Set max restart tries") - oce.add("11", "PLOC Supervisor: Set boot timeout") - oce.add("12", "PLOC Supervisor: Disable Hk") - oce.add(OpCodes.REQ_BOOT_STATUS_REPORT, Info.REQ_BOOT_STATUS_REPORT) - oce.add("17", "PLOC Supervisor: Enable latchup alert") - oce.add("18", "PLOC Supervisor: Disable latchup alert") - oce.add("20", "PLOC Supervisor: Set alert limit") - oce.add("23", "PLOC Supervisor: Set ADC enabled channels") - oce.add("24", "PLOC Supervisor: Set ADC window and stride") - oce.add("25", "PLOC Supervisor: Set ADC threshold") - oce.add("26", "PLOC Supervisor: Request latchup status report") - oce.add("27", "PLOC Supervisor: Copy ADC data to MRAM") - oce.add("30", "PLOC Supervisor: Run auto EM tests") - oce.add("31", "PLOC Supervisor: MRAM Wipe") - oce.add("35", "PLOC Supervisor: Set GPIO") - oce.add("36", "PLOC Supervisor: Read GPIO") - oce.add("37", "PLOC Supervisor: Restart supervisor") - oce.add(OpCodes.PERFORM_UPDATE, Info.PERFORM_UPDATE) - oce.add(OpCodes.START_UPDATE, Info.START_UPDATE) - oce.add("43", "PLOC Supervisor: Terminate supervisor process") - oce.add("44", "PLOC Supervisor: Start MPSoC quiet") - oce.add("45", "PLOC Supervisor: Set shutdown timeout") - oce.add(OpCodes.FACTORY_FLASH, Info.FACTORY_FLASH) - oce.add("47", "PLOC Supervisor: Enable auto TM") - oce.add("48", "PLOC Supervisor: Disable auto TM") - oce.add("51", "PLOC Supervisor: Logging request event buffers") - oce.add("52", "PLOC Supervisor: Logging clear counters") - oce.add("53", "PLOC Supervisor: Logging set topic") - oce.add("54", "PLOC Supervisor: Logging request counters") - oce.add("55", "PLOC Supervisor: Request ADC Report") - oce.add("56", "PLOC Supervisor: Reset PL") - oce.add("57", "PLOC Supervisor: Enable NVMs") - oce.add("58", "PLOC Supervisor: Continue update") - oce.add(OpCodes.MEM_CHECK, Info.MEM_CHECK) - defs.add_service(CustomServiceList.PLOC_SUPV.value, "PLOC Supervisor", oce) +def create_ploc_supv_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + node = CmdTreeNode("ploc_supv", "PLOC Supervisor") + for op_code, info in combined_dict.items(): + node.add_child(CmdTreeNode(op_code, info)) + return node -@service_provider(CustomServiceList.PLOC_SUPV) -def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 - q = p.queue_helper - op_code = p.op_code +def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 object_id = get_object_ids().get(PLOC_SUPV_ID) + assert object_id is not None q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") obyt = object_id.as_bytes prefix = "PLOC Supervisor" - if op_code in OpCodes.OFF: + if cmd_str == OpCode.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCodes.ON: + if cmd_str == OpCode.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") command = pack_mode_data(object_id.as_bytes, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCodes.NORMAL: + if cmd_str == OpCode.NML: q.add_log_cmd(f"{prefix}: {Info.NML}") command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) - if op_code in OpCodes.HK_TO_OBC: + if cmd_str == OpCode.HK_TO_OBC: q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}") command = obyt + struct.pack("!I", SupvActionId.HK_REPORT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.REQUEST_HK: + if cmd_str == OpCode.REQUEST_HK: q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK}") sid = make_sid(object_id.as_bytes, SetIds.HK_REPORT) cmd = generate_one_hk_command(sid) q.add_pus_tc(cmd) - elif op_code in OpCodes.START_MPSOC: + elif cmd_str == OpCode.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.SHUTDOWN_MPSOC: + if cmd_str == OpCode.SHUTDOWN_MPSOC: q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.SEL_NVM: + if cmd_str == OpCode.SEL_NVM: q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) bp0 = int(input("BP0 (0 or 1): ")) @@ -268,7 +227,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 bp2 = int(input("BP2 (0 or 1): ")) command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.FACTORY_RESET: + if cmd_str == OpCode.FACTORY_RESET: q.add_log_cmd(f"{prefix}: {Info.FACTORY_RESET}") while True: print("Please select the key for a factory reset operation") @@ -285,7 +244,7 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 user_data=bytes([key]), ) ) - if op_code == "8": + if cmd_str == "8": q.add_log_cmd("PLOC Supervisor: Set max restart tries") restart_tries = int(input("Specify maximum restart tries: ")) command = ( @@ -294,15 +253,15 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 + struct.pack("!B", restart_tries) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == OpCodes.RESET_MPSOC: + if cmd_str == OpCode.RESET_MPSOC: q.add_log_cmd(Info.RESET_MPSOC) command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.SET_TIME_REF: + if cmd_str in OpCode.SET_TIME_REF: q.add_log_cmd("PLOC Supervisor: Set time reference") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SET_TIME_REF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "11": + if cmd_str == "11": q.add_log_cmd("PLOC Supervisor: Set boot timeout") boot_timeout = int(input("Specify boot timeout [ms]: ")) command = ( @@ -311,11 +270,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 + struct.pack("!I", boot_timeout) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "12": + if cmd_str == "12": q.add_log_cmd("PLOC Supervisor: Disable HK") command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_HK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.REQ_BOOT_STATUS_REPORT: + if cmd_str in OpCode.REQ_BOOT_STATUS_REPORT: q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.GET_BOOT_STATUS_REPORT @@ -325,129 +284,129 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 sid = make_sid(object_id.as_bytes, SetIds.BOOT_STATUS_REPORT) req_hk = generate_one_hk_command(sid) q.add_pus_tc(req_hk) - if op_code == "17": + if cmd_str == "17": q.add_log_cmd("PLOC Supervisor: Enable latchup alert") command = pack_lachtup_alert_cmd(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "18": + if cmd_str == "18": q.add_log_cmd("PLOC Supervisor: Disable latchup alert") command = pack_lachtup_alert_cmd(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "20": + if cmd_str == "20": q.add_log_cmd("PLOC Supervisor: Set alert limit") command = pack_set_alert_limit_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "23": + if cmd_str == "23": q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels") command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "24": + if cmd_str == "24": q.add_log_cmd("PLOC Supervisor: Set ADC window and stride") command = pack_set_adc_window_and_stride_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "25": + if cmd_str == "25": q.add_log_cmd("PLOC Supervisor: Set ADC threshold") command = pack_set_adc_threshold_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "26": + if cmd_str == "26": q.add_log_cmd("PLOC Supervisor: Request latchup status report") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.GET_LATCHUP_STATUS_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "27": + if cmd_str == "27": q.add_log_cmd("PLOC Supervisor: Copy ADC data to MRAM") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.COPY_ADC_DATA_TO_MRAM ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "30": + if cmd_str == "30": q.add_log_cmd("PLOC Supervisor: Run auto EM tests") command = pack_auto_em_tests_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "31": + if cmd_str == "31": q.add_log_cmd("PLOC Supervisor: Wipe MRAM") command = pack_mram_wipe_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "35": + if cmd_str == "35": q.add_log_cmd("PLOC Supervisor: Set GPIO command") command = pack_set_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "36": + if cmd_str == "36": q.add_log_cmd("PLOC Supervisor: Read GPIO command") command = pack_read_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "37": + if cmd_str == "37": q.add_log_cmd("PLOC Supervisor: Restart supervisor") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.RESTART_SUPERVISOR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.START_UPDATE: + if cmd_str in OpCode.START_UPDATE: q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update") command = pack_update_command(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.PERFORM_UPDATE: + if cmd_str in OpCode.PERFORM_UPDATE: q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update") command = pack_update_command(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "43": + if cmd_str == "43": q.add_log_cmd("PLOC Supervisor: Terminate supervisor process") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.TERMINATE_SUPV_HELPER ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "44": + if cmd_str == "44": q.add_log_cmd("PLOC Supervisor: Start MPSoC quiet") command = object_id.as_bytes + struct.pack("!I", SupvActionId.START_MPSOC_QUIET) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "45": + if cmd_str == "45": q.add_log_cmd("PLOC Supervisor: Set shutdown timeout") command = pack_set_shutdown_timeout_command(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.FACTORY_FLASH: + if cmd_str in OpCode.FACTORY_FLASH: q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}") command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "47": + if cmd_str == "47": q.add_log_cmd("PLOC Supervisor: Enable auto TM") command = object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_AUTO_TM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "48": + if cmd_str == "48": q.add_log_cmd("PLOC Supervisor: Disable auto TM") command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_AUTO_TM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "51": + if cmd_str == "51": q.add_log_cmd("PLOC Supervisor: Logging request event buffers") command = pack_logging_buffer_request(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "52": + if cmd_str == "52": q.add_log_cmd("PLOC Supervisor: Logging clear counters") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.LOGGING_CLEAR_COUNTERS ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "53": + if cmd_str == "53": q.add_log_cmd("PLOC Supervisor: Logging set topic") command = pack_logging_set_topic(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "54": + if cmd_str == "54": q.add_log_cmd("PLOC Supervisor: Logging request counters") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.LOGGING_REQUEST_COUNTERS ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "55": + if cmd_str == "55": q.add_log_cmd("PLOC Supervisor: Request ADC report") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.REQUEST_ADC_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "56": + if cmd_str == "56": q.add_log_cmd("PLOC Supervisor: Reset PL") command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_PL) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "57": + if cmd_str == "57": q.add_log_cmd("PLOC Supervisor: Enable NVMs") nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: ")) nvm3 = int(input("Enable (1) or disable(0) NVM 3: ")) @@ -458,11 +417,11 @@ def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 + struct.pack("B", nvm3) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "58": + if cmd_str == "58": q.add_log_cmd("PLOC Supervisor: Continue update") command = object_id.as_bytes + struct.pack("!I", SupvActionId.CONTINUE_UPDATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code in OpCodes.MEM_CHECK: + if cmd_str == OpCode.MEM_CHECK: custom_data = bytearray() update_file = get_update_file() memory_id = int(input("Specify memory ID: ")) @@ -545,9 +504,9 @@ def get_latchup_id() -> int: description_string = "Description".ljust(description_column_width) print(f"{key_string} | {description_string}") print(separator_string) - for key in latchup_id_dict: + for key in LATCHUP_ID_DICT: key_string = key.ljust(key_column_width) - description_string = latchup_id_dict[key].ljust(description_column_width) + description_string = LATCHUP_ID_DICT[key].ljust(description_column_width) print(f"{key_string} | {description_string}") return int(input("Specify latchup ID: ")) diff --git a/eive_tmtc/tmtc/payload/scex.py b/eive_tmtc/tmtc/payload/scex.py index 2b5a198..d39e078 100644 --- a/eive_tmtc/tmtc/payload/scex.py +++ b/eive_tmtc/tmtc/payload/scex.py @@ -2,16 +2,12 @@ import enum import json from spacepackets.ecss import PusTelecommand - -from eive_tmtc.config.definitions import CustomServiceList -from tmtccmd.config.tmtc import tmtc_definitions_provider -from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data, Subservice -from tmtccmd.tmtc import DefaultPusQueueHelper, service_provider -from tmtccmd.tmtc.decorator import ServiceProviderParams +from tmtccmd.config.tmtc import CmdTreeNode from tmtccmd.pus.s8_fsfw_action import create_action_cmd -from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper -from eive_tmtc.config.object_ids import SCEX_HANDLER_ID +from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice, pack_mode_data +from tmtccmd.tmtc import DefaultPusQueueHelper +from eive_tmtc.config.object_ids import SCEX_HANDLER_ID USE_SCEX_CONF_FILE = True @@ -26,10 +22,8 @@ class OpCode: ALL_CELLS_CMD = "allcells" FRAM = "fram" - ON = "on" SWITCH_ON = "on" - OFF = "off" - SWITCH_OFF = OFF + SWITCH_OFF = "off" NORMAL = "normal" @@ -59,28 +53,20 @@ class Info: NORMAL = "Switch SCEX to normal mode" -@tmtc_definitions_provider -def add_scex_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add(keys=OpCode.PING, info=Info.PING) - oce.add(keys=OpCode.ION_CMD, info=Info.ION_CMD) - oce.add(keys=OpCode.TEMP_CMD, info=Info.TEMP_CMD) - oce.add(keys=OpCode.EXP_STATUS_CMD, info=Info.EXP_STATUS_CMD) - oce.add(keys=OpCode.ONE_CELLS_CMD, info=Info.ONE_CELLS_CMD) - - oce.add(keys=OpCode.ALL_CELLS_CMD, info=Info.ALL_CELLS_CMD) - oce.add(keys=OpCode.FRAM, info=Info.FRAM) - oce.add(keys=OpCode.ON, info=Info.SWITCH_ON) - oce.add(keys=OpCode.OFF, info=Info.SWITCH_OFF) - oce.add(keys=OpCode.NORMAL, info=Info.NORMAL) - - defs.add_service( - name=CustomServiceList.SCEX.value, info="SCEX Device", op_code_entry=oce - ) +def create_scex_node() -> CmdTreeNode: + op_code_strs = [ + getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") + ] + info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] + combined_dict = dict(zip(op_code_strs, info_strs)) + scex_node = CmdTreeNode("ploc_mpsoc", "PLOC MPSoC") + for op_code, info in combined_dict.items(): + scex_node.add_child(CmdTreeNode(op_code, info)) + return scex_node def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 - if cmd_str == OpCode.ON: + if cmd_str == OpCode.SWITCH_ON: q.add_log_cmd(Info.SWITCH_ON) q.add_pus_tc( PusTelecommand( @@ -98,7 +84,7 @@ def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 app_data=pack_mode_data(SCEX_HANDLER_ID, Mode.NORMAL, 0), ) ) - if cmd_str == OpCode.OFF: + if cmd_str == OpCode.SWITCH_OFF: q.add_log_cmd(Info.SWITCH_OFF) q.add_pus_tc( PusTelecommand( @@ -158,7 +144,8 @@ def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 dac_weight1 = json_data["dac_weight1"] dac_weight2 = json_data["dac_weight2"] dac_weight3 = json_data["dac_weight3"] - + else: + raise ValueError("CLI support for SCEX params not implemented") # in app_data # app_data.extend(struct.pack("!H", first_dac)) app_data.append(cell_select) @@ -192,6 +179,8 @@ def pack_scex_cmds(q: DefaultPusQueueHelper, cmd_str: str): # noqa C901 dac_weight2 = json_data["dac_weight2"] dac_weight3 = json_data["dac_weight3"] + else: + raise ValueError("CLI support for SCEX params not implemented") # in app_data # app_data.extend(struct.pack("!H", first_dac)) append_16_bit_val(packet=app_data, val=first_dac[cn]) diff --git a/tmtcc.py b/tmtcc.py index 0f10f09..c801358 100755 --- a/tmtcc.py +++ b/tmtcc.py @@ -30,6 +30,7 @@ from tmtccmd.config.args import ( PreArgsParsingWrapper, ProcedureParamsWrapper, SetupParams, + perform_tree_printout, ) from tmtccmd.core import BackendRequest from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter @@ -133,6 +134,9 @@ def setup_params() -> Tuple[SetupWrapper, int]: hk_level = int(post_arg_parsing_wrapper.args_raw.hk) else: hk_level = 0 + if params.app_params.print_tree: + perform_tree_printout(params.app_params, hook_obj.get_command_definitions()) + sys.exit(0) params.apid = PUS_APID if params.com_if is None: raise ValueError("could not determine a COM interface.")