# -*- coding: utf-8 -*- """ @file ploc_supervisor.py @brief Tests for commanding the supervisor of the PLOC. The supervisor is programmed by Thales. @author J. Meier @date 10.07.2021 """ import enum import logging import struct from eive_tmtc.config.object_ids import PLOC_SUPV_ID, get_object_ids from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry from tmtccmd.tmtc import service_provider from tmtccmd.tmtc.decorator import ServiceProviderParams from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from eive_tmtc.utility.input_helper import InputHelper _LOGGER = logging.getLogger(__name__) latchup_id_dict = { "0": "0.85V", "1": "1.8V", "2": "MISC", "3": "3.3V", "4": "NVM_4XO", "5": "MISSION", "6": "SAFECOTS", } HARDCODED = "0" MANUAL_INPUT = "1" HARDCODED_FILE = "/home/rmueller/EIVE/mpsoc_boot.bin" update_file_dict = { HARDCODED: ["hardcoded", ""], MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor/update.bin", "/mnt/sd0/ploc/supervisor/update.bin"], "3": [ "/mnt/sd0/ploc/supervisor/update-large.bin", "/mnt/sd0/ploc/supervisor/update-large.bin", ], "4": [ "/mnt/sd0/ploc/supervisor/update-small.bin", "/mnt/sd0/ploc/supervisor/update-small.bin", ], "5": [ "/mnt/sd0/ploc/supervisor/mpsoc-uart-working.bin", "/mnt/sd0/ploc/supervisor/mpsoc-uart-working.bin", ], } event_buffer_path_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor", "/mnt/sd0/ploc/supervisor"], } FACTORY_RESET_OPS = { 0x00: "CLEAR_MRAM_EVENT_BUF", 0x01: "CLEAR_MRAM_ADC_BUF", 0x02: "FACTORY_DEFAULT_MRAM_SYS_CFG", 0x03: "FACTORY_DEFAULT_MRAM_DBG_CFG", 0x04: "FACTORY_DEFAULT_BOOTMAN_CFG", 0x05: "FACTORY_DEFAULT_DATA_LOGGER", 0x06: "DATA_LOGGER_OP_DATA_TO_ZERO", 0x07: "FACTORY_DEFAULT_MRAM_LATCHUP_MON", 0x08: "FACTORY_DEFAULT_ADC_MON_CFG", 0x09: "FACTORY_DEFAULT_WATCHDOG_MON_CFG", 0x0A: "FACTORY_DEFAULT_HK_CFG", 0x0B: "FACTORY_DEFAULT_MEM_MAN_CFG", 0x10: "REDWIRE_TASK_1", 0x11: "REDWIRE_TASK_2", 0x12: "REDWIRE_TASK_3", } class SupvActionId(enum.IntEnum): HK_REPORT = 1 START_MPSOC = 3 SHUTWOWN_MPSOC = 4 SEL_MPSOC_BOOT_IMAGE = 5 SET_BOOT_TIMEOUT = 6 SET_MAX_RESTART_TRIES = 7 RESET_MPSOC = 8 SET_TIME_REF = 9 DISABLE_HK = 10 GET_BOOT_STATUS_REPORT = 11 UPDATE_AVAILABLE = 12 ENABLE_LATCHUP_ALERT = 15 DISABLE_LATCHUP_ALERT = 16 SET_ALERT_LIMIT = 18 SET_ADC_SWEEP_PERIOD = 20 SET_ADC_ENABLED_CHANNELS = 21 SET_ADC_WINDOW_AND_STRIDE = 22 SET_ADC_THRESHOLD = 23 GET_LATCHUP_STATUS_REPORT = 24 COPY_ADC_DATA_TO_MRAM = 25 SELECT_NVM = 27 RUN_AUTO_EM_TESTS = 28 WIPE_MRAM = 29 DUMP_MRAM = 30 SET_GPIO = 34 READ_GPIO = 35 RESTART_SUPERVISOR = 36 LOGGING_REQUEST_COUNTERS = 38 FACTORY_RESET = 39 START_MPSOC_QUIET = 45 SET_SHUTDOWN_TIMEOUT = 46 FACTORY_FLASH = 47 PERFORM_UPDATE = 48 TERMINATE_SUPV_HELPER = 49 ENABLE_AUTO_TM = 50 DISABLE_AUTO_TM = 51 LOGGING_REQUEST_EVENT_BUFFERS = 54 LOGGING_CLEAR_COUNTERS = 55 LOGGING_SET_TOPIC = 56 REQUEST_ADC_REPORT = 57 RESET_PL = 58 ENABLE_NVMS = 59 CONTINUE_UPDATE = 60 MEM_CHECK = 61 class SetIds(enum.IntEnum): HK_REPORT = 102 BOOT_STATUS_REPORT = 103 class OpCodes: OFF = ["0", "off"] ON = ["1", "on"] NORMAL = ["2", "nml"] HK_TO_OBC = ["3", "hk_to_obc"] REQUEST_HK = ["4", "req_hk"] START_MPSOC = ["5", "start_mpsoc"] SHUTDOWN_MPSOC = ["6", "stop_mpsoc"] SEL_NVM = ["7", "sel_nvm"] SET_TIME_REF = ["set_time_ref"] FACTORY_FLASH = ["factory_flash"] REQ_BOOT_STATUS_REPORT = ["13", "boot_report"] START_UPDATE = ["42", "start_update"] PERFORM_UPDATE = ["update"] FACTORY_RESET = ["factory_reset"] MEM_CHECK = ["mem_check"] RESET_MPSOC = "reset_mpsoc" class Info(str, enum.Enum): value: str OFF = "Switch Off" ON = "Switch On" NML = "Switch Normal" HK_TO_OBC = "Request HK from PLOC SUPV" REQUEST_HK = "Request HK set from PLOC Handler" SET_TIME_REF = "Set time reference" FACTORY_FLASH = "Factory Flash Mode" PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes" START_UPDATE = "Start new MPSoC SW update" FACTORY_RESET = "Factory Reset of loggers" REQ_BOOT_STATUS_REPORT = "Request boot status report and HK" MEM_CHECK = "Memory Check" SEL_NVM = "Select NVM" RESET_MPSOC = "Reset MPSoC" @tmtc_definitions_provider def add_ploc_supv_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(OpCodes.OFF, Info.OFF) oce.add(OpCodes.ON, Info.ON) oce.add(OpCodes.NORMAL, Info.NML) oce.add(OpCodes.HK_TO_OBC, Info.HK_TO_OBC) oce.add(OpCodes.REQUEST_HK, Info.REQUEST_HK) oce.add(OpCodes.START_MPSOC, "PLOC Supervisor: Start MPSoC") oce.add(OpCodes.SHUTDOWN_MPSOC, "PLOC Supervisor: Shutdown MPSoC") oce.add(OpCodes.SEL_NVM, Info.SEL_NVM) oce.add(OpCodes.SET_TIME_REF, Info.SET_TIME_REF) oce.add(OpCodes.FACTORY_RESET, Info.FACTORY_RESET) oce.add(OpCodes.RESET_MPSOC, Info.RESET_MPSOC) oce.add("8", "PLOC Supervisor: Set max restart tries") oce.add("11", "PLOC Supervisor: Set boot timeout") oce.add("12", "PLOC Supervisor: Disable Hk") oce.add(OpCodes.REQ_BOOT_STATUS_REPORT, Info.REQ_BOOT_STATUS_REPORT) oce.add("17", "PLOC Supervisor: Enable latchup alert") oce.add("18", "PLOC Supervisor: Disable latchup alert") oce.add("20", "PLOC Supervisor: Set alert limit") oce.add("23", "PLOC Supervisor: Set ADC enabled channels") oce.add("24", "PLOC Supervisor: Set ADC window and stride") oce.add("25", "PLOC Supervisor: Set ADC threshold") oce.add("26", "PLOC Supervisor: Request latchup status report") oce.add("27", "PLOC Supervisor: Copy ADC data to MRAM") oce.add("30", "PLOC Supervisor: Run auto EM tests") oce.add("31", "PLOC Supervisor: MRAM Wipe") oce.add("35", "PLOC Supervisor: Set GPIO") oce.add("36", "PLOC Supervisor: Read GPIO") oce.add("37", "PLOC Supervisor: Restart supervisor") oce.add(OpCodes.PERFORM_UPDATE, Info.PERFORM_UPDATE) oce.add(OpCodes.START_UPDATE, Info.START_UPDATE) oce.add("43", "PLOC Supervisor: Terminate supervisor process") oce.add("44", "PLOC Supervisor: Start MPSoC quiet") oce.add("45", "PLOC Supervisor: Set shutdown timeout") oce.add(OpCodes.FACTORY_FLASH, Info.FACTORY_FLASH) oce.add("47", "PLOC Supervisor: Enable auto TM") oce.add("48", "PLOC Supervisor: Disable auto TM") oce.add("51", "PLOC Supervisor: Logging request event buffers") oce.add("52", "PLOC Supervisor: Logging clear counters") oce.add("53", "PLOC Supervisor: Logging set topic") oce.add("54", "PLOC Supervisor: Logging request counters") oce.add("55", "PLOC Supervisor: Request ADC Report") oce.add("56", "PLOC Supervisor: Reset PL") oce.add("57", "PLOC Supervisor: Enable NVMs") oce.add("58", "PLOC Supervisor: Continue update") oce.add(OpCodes.MEM_CHECK, Info.MEM_CHECK) defs.add_service(CustomServiceList.PLOC_SUPV.value, "PLOC Supervisor", oce) @service_provider(CustomServiceList.PLOC_SUPV) def pack_ploc_supv_commands(p: ServiceProviderParams): # noqa C901 q = p.queue_helper op_code = p.op_code object_id = get_object_ids().get(PLOC_SUPV_ID) q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") obyt = object_id.as_bytes prefix = "PLOC Supervisor" if op_code in OpCodes.OFF: q.add_log_cmd(f"{prefix}: {Info.OFF}") command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code in OpCodes.ON: q.add_log_cmd(f"{prefix}: {Info.ON}") command = pack_mode_data(object_id.as_bytes, Mode.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code in OpCodes.NORMAL: q.add_log_cmd(f"{prefix}: {Info.NML}") command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code in OpCodes.HK_TO_OBC: q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}") command = obyt + struct.pack("!I", SupvActionId.HK_REPORT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.REQUEST_HK: q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK}") sid = make_sid(object_id.as_bytes, SetIds.HK_REPORT) cmd = generate_one_hk_command(sid) q.add_pus_tc(cmd) elif op_code in OpCodes.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.SHUTDOWN_MPSOC: q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.SEL_NVM: q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) bp0 = int(input("BP0 (0 or 1): ")) bp1 = int(input("BP1 (0 or 1): ")) bp2 = int(input("BP2 (0 or 1): ")) command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.FACTORY_RESET: q.add_log_cmd(f"{prefix}: {Info.FACTORY_RESET}") while True: print("Please select the key for a factory reset operation") for key, val in FACTORY_RESET_OPS.items(): print(f"{key}: {val}") key = int(input("Key Select: ")) if key not in FACTORY_RESET_OPS: print("Key invalid!") break q.add_pus_tc( create_action_cmd( object_id=PLOC_SUPV_ID, action_id=SupvActionId.FACTORY_RESET, user_data=bytes([key]), ) ) if op_code == "8": q.add_log_cmd("PLOC Supervisor: Set max restart tries") restart_tries = int(input("Specify maximum restart tries: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionId.SET_MAX_RESTART_TRIES) + struct.pack("!B", restart_tries) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == OpCodes.RESET_MPSOC: q.add_log_cmd(Info.RESET_MPSOC) command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.SET_TIME_REF: q.add_log_cmd("PLOC Supervisor: Set time reference") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SET_TIME_REF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "11": q.add_log_cmd("PLOC Supervisor: Set boot timeout") boot_timeout = int(input("Specify boot timeout [ms]: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionId.SET_BOOT_TIMEOUT) + struct.pack("!I", boot_timeout) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "12": q.add_log_cmd("PLOC Supervisor: Disable HK") command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_HK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.REQ_BOOT_STATUS_REPORT: q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.GET_BOOT_STATUS_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_wait_seconds(2.0) sid = make_sid(object_id.as_bytes, SetIds.BOOT_STATUS_REPORT) req_hk = generate_one_hk_command(sid) q.add_pus_tc(req_hk) if op_code == "17": q.add_log_cmd("PLOC Supervisor: Enable latchup alert") command = pack_lachtup_alert_cmd(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "18": q.add_log_cmd("PLOC Supervisor: Disable latchup alert") command = pack_lachtup_alert_cmd(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "20": q.add_log_cmd("PLOC Supervisor: Set alert limit") command = pack_set_alert_limit_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "23": q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels") command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "24": q.add_log_cmd("PLOC Supervisor: Set ADC window and stride") command = pack_set_adc_window_and_stride_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "25": q.add_log_cmd("PLOC Supervisor: Set ADC threshold") command = pack_set_adc_threshold_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "26": q.add_log_cmd("PLOC Supervisor: Request latchup status report") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.GET_LATCHUP_STATUS_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "27": q.add_log_cmd("PLOC Supervisor: Copy ADC data to MRAM") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.COPY_ADC_DATA_TO_MRAM ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "30": q.add_log_cmd("PLOC Supervisor: Run auto EM tests") command = pack_auto_em_tests_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "31": q.add_log_cmd("PLOC Supervisor: Wipe MRAM") command = pack_mram_wipe_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "35": q.add_log_cmd("PLOC Supervisor: Set GPIO command") command = pack_set_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "36": q.add_log_cmd("PLOC Supervisor: Read GPIO command") command = pack_read_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "37": q.add_log_cmd("PLOC Supervisor: Restart supervisor") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.RESTART_SUPERVISOR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.START_UPDATE: q.add_log_cmd("PLOC Supversior: Start new MPSoC SW update") command = pack_update_command(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.PERFORM_UPDATE: q.add_log_cmd("PLOC Supervisor: Perform MPSoC SW update") command = pack_update_command(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "43": q.add_log_cmd("PLOC Supervisor: Terminate supervisor process") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.TERMINATE_SUPV_HELPER ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "44": q.add_log_cmd("PLOC Supervisor: Start MPSoC quiet") command = object_id.as_bytes + struct.pack("!I", SupvActionId.START_MPSOC_QUIET) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "45": q.add_log_cmd("PLOC Supervisor: Set shutdown timeout") command = pack_set_shutdown_timeout_command(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.FACTORY_FLASH: q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}") command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "47": q.add_log_cmd("PLOC Supervisor: Enable auto TM") command = object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_AUTO_TM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "48": q.add_log_cmd("PLOC Supervisor: Disable auto TM") command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_AUTO_TM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "51": q.add_log_cmd("PLOC Supervisor: Logging request event buffers") command = pack_logging_buffer_request(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "52": q.add_log_cmd("PLOC Supervisor: Logging clear counters") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.LOGGING_CLEAR_COUNTERS ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "53": q.add_log_cmd("PLOC Supervisor: Logging set topic") command = pack_logging_set_topic(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "54": q.add_log_cmd("PLOC Supervisor: Logging request counters") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.LOGGING_REQUEST_COUNTERS ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "55": q.add_log_cmd("PLOC Supervisor: Request ADC report") command = object_id.as_bytes + struct.pack( "!I", SupvActionId.REQUEST_ADC_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "56": q.add_log_cmd("PLOC Supervisor: Reset PL") command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_PL) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "57": q.add_log_cmd("PLOC Supervisor: Enable NVMs") nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: ")) nvm3 = int(input("Enable (1) or disable(0) NVM 3: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_NVMS) + struct.pack("B", nvm01) + struct.pack("B", nvm3) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "58": q.add_log_cmd("PLOC Supervisor: Continue update") command = object_id.as_bytes + struct.pack("!I", SupvActionId.CONTINUE_UPDATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code in OpCodes.MEM_CHECK: custom_data = bytearray() update_file = get_update_file() memory_id = int(input("Specify memory ID: ")) start_address = int(input("Specify start address: 0x"), 16) custom_data.extend(update_file.encode("utf-8")) custom_data.append(0) custom_data.extend(struct.pack("!B", memory_id)) custom_data.extend(struct.pack("!I", start_address)) q.add_log_cmd( f"{prefix}: {Info.MEM_CHECK} for file {update_file} at memory ID" f" {memory_id} at start address {start_address}" ) command = create_action_cmd( object_id.as_bytes, SupvActionId.MEM_CHECK, custom_data ) q.add_pus_tc(command) def pack_sel_boot_image_cmd( object_id: bytes, mem: int, bp0: int, bp1: int, bp2: int ) -> bytearray: """This function can be used to generate the command to select the image from which the MPSoC will boot. @param object_id The object id of the PLOC supervisor handler. @param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1) @param bp0 Partition pin 0 @param bp1 Partition pin 1 @param bp2 Partition pin 2 """ command = object_id + struct.pack("!I", SupvActionId.SEL_MPSOC_BOOT_IMAGE) command = command + struct.pack("!B", mem) command = command + struct.pack("!B", bp0) command = command + struct.pack("!B", bp1) command = command + struct.pack("!B", bp2) return bytearray(command) def pack_update_available_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the udpate availabe command. @param object_id The object id of the PLOC supervisor handler. """ image_select = 1 image_partition = 0 image_size = 222 image_crc = 0x0 number_of_packets = 150 command = object_id + struct.pack("!I", SupvActionId.UPDATE_AVAILABLE) command = command + struct.pack("!B", image_select) command = command + struct.pack("!B", image_partition) command = command + struct.pack("!I", image_size) command = command + struct.pack("!I", image_crc) command = command + struct.pack("!I", number_of_packets) return bytearray(command) def pack_lachtup_alert_cmd(object_id: bytes, state: bool) -> bytearray: """ @brief This function packs the command to enable or disable a certain latchup alerts. @param object_id The object id of the PLOC supervisor handler. @param state True - enable latchup alert, False - disable latchup alert """ latchup_id = get_latchup_id() command = bytearray() if state: command = object_id + struct.pack("!I", SupvActionId.ENABLE_LATCHUP_ALERT) else: command = object_id + struct.pack("!I", SupvActionId.DISABLE_LATCHUP_ALERT) command = command + struct.pack("!B", latchup_id) return bytearray(command) def get_latchup_id() -> int: key_column_width = 10 description_column_width = 50 separator_width = key_column_width + description_column_width + 3 separator_string = separator_width * "-" key_string = "Latchup ID".ljust(key_column_width) description_string = "Description".ljust(description_column_width) print(f"{key_string} | {description_string}") print(separator_string) for key in latchup_id_dict: key_string = key.ljust(key_column_width) description_string = latchup_id_dict[key].ljust(description_column_width) print(f"{key_string} | {description_string}") return int(input("Specify latchup ID: ")) def pack_set_alert_limit_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to set the limit of a latchup alert. @param object_id The object id of the PLOC supervisor handler. """ latchup_id = get_latchup_id() dutycycle = int(input("Specify dutycycle: ")) command = bytearray() command = object_id + struct.pack("!I", SupvActionId.SET_ALERT_LIMIT) command = command + struct.pack("!B", latchup_id) command = command + struct.pack("!I", dutycycle) return bytearray(command) def pack_set_adc_enabled_channels_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to enable or disable channels of the ADC. @param object_id The object id of the PLOC supervisor handler. """ ch = int(input("Specify ch: 0x"), 16) cmd = object_id + struct.pack("!I", SupvActionId.SET_ADC_ENABLED_CHANNELS) cmd = cmd + struct.pack("!H", ch) return bytearray(cmd) def pack_set_adc_window_and_stride_cmd(object_id: bytes) -> bytearray: window_size = int(input("Specify window size: ")) striding_step_size = int(input("Specify striding step size: ")) command = object_id + struct.pack("!I", SupvActionId.SET_ADC_WINDOW_AND_STRIDE) command = command + struct.pack("!H", window_size) command = command + struct.pack("!H", striding_step_size) return bytearray(command) def pack_set_adc_threshold_cmd(object_id: bytes) -> bytearray: threshold = int(input("Specify threshold: ")) command = object_id + struct.pack("!I", SupvActionId.SET_ADC_THRESHOLD) command = command + struct.pack("!I", threshold) return bytearray(command) def pack_select_nvm_cmd(object_id: bytes) -> bytearray: mem = int(input("Specify NVM (0 - NVM0, 1 - MVM1): ")) command = object_id + struct.pack("!I", SupvActionId.SELECT_NVM) command = command + struct.pack("!B", mem) return bytearray(command) def pack_auto_em_tests_cmd(object_id: bytes) -> bytearray: test = int(input("Specify test (1 - complete, 2 - short): ")) command = object_id + struct.pack("!I", SupvActionId.RUN_AUTO_EM_TESTS) command = command + struct.pack("!B", test) return bytearray(command) def pack_mram_wipe_cmd(object_id: bytes) -> bytearray: start = int(input("Start address: 0x"), 16) stop = int(input("Stop address: 0x"), 16) command = object_id + struct.pack("!I", SupvActionId.WIPE_MRAM) command = command + struct.pack("!I", start) command = command + struct.pack("!I", stop) return bytearray(command) def pack_update_command(object_id: bytes, new_update: bool) -> bytearray: command = bytearray() memory_id = int(input("Specify memory ID: ")) start_address = int(input("Specify start address: 0x"), 16) update_file = get_update_file() if new_update: init_bytes_written = 0 init_seq_count = 1 del_mem = True else: init_bytes_written = input("Specify bytes to start from [0 default]: ") if init_bytes_written == "": init_bytes_written = 0 init_bytes_written = int(init_bytes_written) init_seq_count = input("Specify initial sequence count [1 default]: ") if init_seq_count == "": init_seq_count = 1 init_seq_count = int(init_seq_count) del_mem = input("Delete memory? [y/n, y default]: ") if del_mem.lower() in ["y", "1"]: del_mem = 1 elif del_mem.lower() in ["n", "0"]: del_mem = 0 else: raise ValueError("Invalid input, use y or n") command += object_id command += struct.pack("!I", SupvActionId.PERFORM_UPDATE) command += bytearray(update_file, "utf-8") # Adding null terminator command += struct.pack("!B", 0) command += struct.pack("!B", memory_id) command += struct.pack("!I", start_address) command.extend(struct.pack("!I", init_bytes_written)) command.extend(struct.pack("!H", init_seq_count)) command.append(del_mem) return bytearray(command) def pack_set_shutdown_timeout_command(object_id: bytes) -> bytearray: command = bytearray() command += object_id command += struct.pack("!I", SupvActionId.SET_SHUTDOWN_TIMEOUT) timeout = int(input("Specify shutdown timeout (ms): ")) command += struct.pack("!I", timeout) return command def pack_logging_buffer_request(object_id: bytes) -> bytearray: command = bytearray() command += object_id command += struct.pack("!I", SupvActionId.LOGGING_REQUEST_EVENT_BUFFERS) path = get_event_buffer_path() command += bytearray(path, "utf-8") return command def pack_set_gpio_cmd(object_id: bytes) -> bytearray: port = int(input("Specify port: 0x"), 16) pin = int(input("Specify pin: 0x"), 16) val = int(input("Specify val: 0x"), 16) command = bytearray(object_id + struct.pack("!I", SupvActionId.SET_GPIO)) command.append(port) command.append(pin) command.append(val) return bytearray(command) def pack_read_gpio_cmd(object_id: bytes) -> bytearray: port = int(input("Specify port: 0x"), 16) pin = int(input("Specify pin: 0x"), 16) command = object_id + struct.pack("!I", SupvActionId.READ_GPIO) command = command + struct.pack("!B", port) command = command + struct.pack("!B", pin) return bytearray(command) def pack_logging_set_topic(object_id: bytes) -> bytearray: command = object_id + struct.pack("!I", SupvActionId.LOGGING_SET_TOPIC) tpc = int(input("Specify logging topic: ")) command += struct.pack("!B", tpc) return bytearray(command) def get_update_file() -> str: _LOGGER.info("Specify update file ") input_helper = InputHelper(update_file_dict) key = input_helper.get_key() if key == HARDCODED: file = HARDCODED_FILE elif key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify absolute name of update file: ") else: file = update_file_dict[key][1] return file def get_event_buffer_path() -> str: _LOGGER.info("Specify path where to store event buffer file ") input_helper = InputHelper(event_buffer_path_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify path: ") else: file = event_buffer_path_dict[key][1] return file class SocState(enum.IntEnum): OFF = 0 BOOTING = 1 OPERATIONAL = 2 SHUTDOWN = 3 def handle_supv_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper): current_idx = 0 if set_id == SetIds.HK_REPORT: fmt_str = "!IIIQIIIIIBBBB" inc_len = struct.calcsize(fmt_str) ( temp_ps, temp_pl, temp_sup, uptime, cpu_load, avail_heap, num_tcs, num_tms, soc_state, nvm_0_1_state, nvm_3_state, mission_io_state, fmc_state, ) = struct.unpack(fmt_str, hk_data[:inc_len]) pw.dlog(f"Temp PS {temp_ps} C | Temp PL {temp_pl} C | Temp SUP {temp_sup} C") pw.dlog(f"Uptime {uptime} | CPU Load {cpu_load} | Avail Heap {avail_heap}") pw.dlog(f"Number TCs {num_tcs} | Number TMs {num_tms}") pw.dlog(f"SOC state {SocState(soc_state)}") pw.dlog(f"NVM 01 State {nvm_0_1_state}") pw.dlog(f"NVM 3 State {nvm_3_state}") pw.dlog(f"Mission IO state {mission_io_state}") pw.dlog(f"FMC state {fmc_state}") pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[inc_len:], 13)) elif set_id == SetIds.BOOT_STATUS_REPORT: fmt_str = "!BBIIBBBBBB" inc_len = struct.calcsize(fmt_str) ( soc_state, power_cycles, boot_after_ms, boot_timeout_ms, active_nvm, bp_0_state, bp_1_state, bp_2_state, boot_state, boot_cycles, ) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len]) current_idx += inc_len pw.dlog( "SoC state (0:off, 1:booting, 2:update, 3:operating, 4:shutdown, 5:reset):" f" {soc_state}" ) pw.dlog(f"Power Cycles {power_cycles}") pw.dlog(f"Boot after {boot_after_ms} ms | Boot timeout {boot_timeout_ms} ms") pw.dlog(f"Active NVM: {active_nvm}") pw.dlog( f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}" ) pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 10) else: pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}") pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]")