diff --git a/CHANGELOG.md b/CHANGELOG.md index 7633c35..e218b34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,12 @@ list yields a list of all related PRs for each release. # [unreleased] +# [v6.0.0] 2024-01-31 + ## Changed - Bumped `tmccmd` to `v8.0.0rc1` to introduce new command tree handling. +- Added new PLOC SUPV commands to test sets, cleaned up PLOC SUPV commanding. # [v5.13.0] 2024-01-30 diff --git a/eive_tmtc/pus_tm/hk_handler.py b/eive_tmtc/pus_tm/hk_handler.py index 84d6016..aeb2433 100644 --- a/eive_tmtc/pus_tm/hk_handler.py +++ b/eive_tmtc/pus_tm/hk_handler.py @@ -1,6 +1,7 @@ """HK Handling for EIVE OBSW""" import dataclasses import logging +import base64 # noqa import sqlite3 from typing import List, cast from uuid import UUID @@ -73,6 +74,7 @@ def handle_hk_packet( hk_data = tm_packet.tm_data[8:] if named_obj_id in hk_filter.object_ids: + # print(f"PUS TM Base64: {base64.b64encode(raw_tm)}") handle_regular_hk_print( printer=printer, packet_uuid=packet_uuid, diff --git a/eive_tmtc/tmtc/payload/ploc_supervisor.py b/eive_tmtc/tmtc/payload/ploc_supervisor.py index fbb5b8f..f75198f 100644 --- a/eive_tmtc/tmtc/payload/ploc_supervisor.py +++ b/eive_tmtc/tmtc/payload/ploc_supervisor.py @@ -6,6 +6,7 @@ @author J. Meier @date 10.07.2021 """ +from datetime import datetime import enum import logging import struct @@ -129,19 +130,20 @@ class OpCode: ON = "on" NORMAL = "nml" HK_TO_OBC = "hk_to_obc" - REQUEST_HK_SET_FROM_DEV = "req_hk_from_dev" - REQUEST_HK_SET = "req_hk" + REQUEST_GENERIC_HK_SET = "req_generic_hk" START_MPSOC = "start_mpsoc" SHUTDOWN_MPSOC = "stop_mpsoc" SEL_NVM = "sel_nvm" SET_TIME_REF = "set_time_ref" FACTORY_FLASH = "factory_flash" - REQ_BOOT_STATUS_REPORT = "boot_report" START_UPDATE = "start_update" PERFORM_UPDATE = "update" FACTORY_RESET = "factory_reset" MEM_CHECK = "mem_check" RESET_MPSOC = "reset_mpsoc" + SET_GPIO = "set_gpio" + READ_GPIO = "read_gpio" + READ_STATUS_REPORT = "read_status_report" class Info(str, enum.Enum): @@ -149,19 +151,20 @@ class Info(str, enum.Enum): ON = "Switch On" NORMAL = "Switch Normal" HK_TO_OBC = "Request HK from PLOC SUPV" + REQUEST_GENERIC_HK_SET = "Request prompted HK set from PLOC Handler" START_MPSOC = "Start MPSoC" SHUTDOWN_MPSOC = "Shutdown MPSoC" - REQUEST_HK_SET_FROM_DEV = "Request HK set from the device to the PLOC Handler" - REQUEST_HK_SET = "Request HK set from PLOC Handler" SET_TIME_REF = "Set time reference" FACTORY_FLASH = "Factory Flash Mode" PERFORM_UPDATE = "Start or continue MPSoC SW update at starting bytes" START_UPDATE = "Start new MPSoC SW update" FACTORY_RESET = "Factory Reset of loggers" - REQ_BOOT_STATUS_REPORT = "Request boot status report and HK" MEM_CHECK = "Memory Check" SEL_NVM = "Select NVM" RESET_MPSOC = "Reset MPSoC" + SET_GPIO = "Set GPIO" + READ_GPIO = "Read GPIO" + READ_STATUS_REPORT = "Read HK status report" def create_ploc_supv_node() -> CmdTreeNode: @@ -198,7 +201,7 @@ def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C90 q.add_log_cmd(f"{prefix}: {Info.HK_TO_OBC}") command = obyt + struct.pack("!I", SupvActionId.REQUEST_HK_REPORT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - elif cmd_str == OpCode.START_MPSOC: + if cmd_str == OpCode.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) @@ -206,15 +209,11 @@ def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C90 q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionId.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == OpCode.REQUEST_HK_SET: - q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK_SET}") - sid = make_sid(object_id.as_bytes, prompt_set_id()) - cmd = generate_one_hk_command(sid) - q.add_pus_tc(cmd) - if cmd_str == OpCode.REQUEST_HK_SET_FROM_DEV: - q.add_log_cmd(f"{prefix}: {Info.REQUEST_HK_SET_FROM_DEV}") + if cmd_str == OpCode.READ_STATUS_REPORT: + q.add_log_cmd(f"{prefix}: {Info.READ_STATUS_REPORT}") set_id = prompt_set_id() action_cmd = None + # First read the set from the device. if set_id == SetId.HK_REPORT: action_cmd = create_action_cmd(PLOC_SUPV_ID, SupvActionId.REQUEST_HK_REPORT) if set_id == SetId.ADC_REPORT: @@ -225,8 +224,24 @@ def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C90 action_cmd = create_action_cmd( PLOC_SUPV_ID, SupvActionId.REQUEST_LOGGING_COUNTERS ) - assert action_cmd is not None + if set_id == SetId.LATCHUP_REPORT: + action_cmd = create_action_cmd( + PLOC_SUPV_ID, SupvActionId.GET_LATCHUP_STATUS_REPORT + ) + if set_id == SetId.BOOT_STATUS_REPORT: + action_cmd = create_action_cmd( + PLOC_SUPV_ID, SupvActionId.GET_BOOT_STATUS_REPORT + ) + if action_cmd is None: + _LOGGER.warning(f"invalid set ID {set_id!r} for PLOC SUPV") + return + # Now dump the HK set. + sid = make_sid(object_id.as_bytes, set_id) + req_hk = generate_one_hk_command(sid) q.add_pus_tc(action_cmd) + q.add_wait_seconds(2.0) + q.add_pus_tc(req_hk) + assert action_cmd is not None elif cmd_str == OpCode.START_MPSOC: q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionId.START_MPSOC) @@ -286,32 +301,6 @@ def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C90 + struct.pack("!I", boot_timeout) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "12": - q.add_log_cmd("PLOC Supervisor: Disable HK") - command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_HK) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str in OpCode.REQ_BOOT_STATUS_REPORT: - q.add_log_cmd(f"{prefix}: {Info.REQ_BOOT_STATUS_REPORT}") - command = object_id.as_bytes + struct.pack( - "!I", SupvActionId.GET_BOOT_STATUS_REPORT - ) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - q.add_wait_seconds(2.0) - sid = make_sid(object_id.as_bytes, SetId.BOOT_STATUS_REPORT) - req_hk = generate_one_hk_command(sid) - q.add_pus_tc(req_hk) - if cmd_str == "17": - q.add_log_cmd("PLOC Supervisor: Enable latchup alert") - command = pack_lachtup_alert_cmd(object_id.as_bytes, True) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "18": - q.add_log_cmd("PLOC Supervisor: Disable latchup alert") - command = pack_lachtup_alert_cmd(object_id.as_bytes, False) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "20": - q.add_log_cmd("PLOC Supervisor: Set alert limit") - command = pack_set_alert_limit_cmd(object_id.as_bytes) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "23": q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels") command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes) @@ -324,31 +313,11 @@ def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C90 q.add_log_cmd("PLOC Supervisor: Set ADC threshold") command = pack_set_adc_threshold_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "26": - q.add_log_cmd("PLOC Supervisor: Request latchup status report") - command = object_id.as_bytes + struct.pack( - "!I", SupvActionId.GET_LATCHUP_STATUS_REPORT - ) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "27": - q.add_log_cmd("PLOC Supervisor: Copy ADC data to MRAM") - command = object_id.as_bytes + struct.pack( - "!I", SupvActionId.COPY_ADC_DATA_TO_MRAM - ) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "30": - q.add_log_cmd("PLOC Supervisor: Run auto EM tests") - command = pack_auto_em_tests_cmd(object_id.as_bytes) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "31": - q.add_log_cmd("PLOC Supervisor: Wipe MRAM") - command = pack_mram_wipe_cmd(object_id.as_bytes) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "35": + if cmd_str == OpCode.SET_GPIO: q.add_log_cmd("PLOC Supervisor: Set GPIO command") command = pack_set_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "36": + if cmd_str == OpCode.READ_GPIO: q.add_log_cmd("PLOC Supervisor: Read GPIO command") command = pack_read_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) @@ -384,40 +353,6 @@ def pack_ploc_supv_commands(q: DefaultPusQueueHelper, cmd_str: str): # noqa C90 q.add_log_cmd(f"{prefix}: {Info.FACTORY_FLASH}") command = object_id.as_bytes + struct.pack("!I", SupvActionId.FACTORY_FLASH) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "47": - q.add_log_cmd("PLOC Supervisor: Enable auto TM") - command = object_id.as_bytes + struct.pack("!I", SupvActionId.ENABLE_AUTO_TM) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "48": - q.add_log_cmd("PLOC Supervisor: Disable auto TM") - command = object_id.as_bytes + struct.pack("!I", SupvActionId.DISABLE_AUTO_TM) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "51": - q.add_log_cmd("PLOC Supervisor: Logging request event buffers") - command = pack_logging_buffer_request(object_id.as_bytes) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "52": - q.add_log_cmd("PLOC Supervisor: Logging clear counters") - command = object_id.as_bytes + struct.pack( - "!I", SupvActionId.LOGGING_CLEAR_COUNTERS - ) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "53": - q.add_log_cmd("PLOC Supervisor: Logging set topic") - command = pack_logging_set_topic(object_id.as_bytes) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "54": - q.add_log_cmd("PLOC Supervisor: Logging request counters") - command = object_id.as_bytes + struct.pack( - "!I", SupvActionId.REQUEST_LOGGING_COUNTERS - ) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if cmd_str == "55": - q.add_log_cmd("PLOC Supervisor: Request ADC report") - command = object_id.as_bytes + struct.pack( - "!I", SupvActionId.REQUEST_ADC_REPORT - ) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if cmd_str == "56": q.add_log_cmd("PLOC Supervisor: Reset PL") command = object_id.as_bytes + struct.pack("!I", SupvActionId.RESET_PL) @@ -728,6 +663,8 @@ def handle_supv_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper): handle_adc_report(hk_data) elif set_id == SetId.COUNTERS_REPORT: handle_counters_report(hk_data) + elif set_id == SetId.LATCHUP_REPORT: + handle_latchup_status_report(hk_data) else: pw.dlog(f"PLOC SUPV: HK handling not implemented for set ID {set_id}") pw.dlog(f"Raw Data: 0x[{hk_data.hex(sep=',')}]") @@ -869,3 +806,53 @@ def handle_counters_report(hk_data: bytes): print(f"MM task lost: {mm_task_lost}") print(f"HK task lost: {hk_task_lost}") print(f"DL task lost: {dl_task_lost}") + + +def handle_latchup_status_report(hk_data: bytes): + # 1 byte ID, 7 times 2 bytes of counts, and 8 bytes of time and 1 byte sync status. + if len(hk_data) < 24: + raise ValueError("Latchup status report smaller than expected") + current_idx = 0 + id = hk_data[current_idx] + current_idx += 1 + counts_of_alerts = [] + for _ in range(7): + counts_of_alerts.append( + struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + ) + current_idx += 2 + print(f"ID: {id}") + print(f"Counts of alerts: {counts_of_alerts}") + time_ms = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + current_idx += 2 + time_seconds = hk_data[current_idx] + current_idx += 1 + time_minutes = hk_data[current_idx] + current_idx += 1 + time_hour = hk_data[current_idx] + current_idx += 1 + time_day = hk_data[current_idx] + current_idx += 1 + time_month = hk_data[current_idx] + current_idx += 1 + # Is stored as years since 1900. + time_year = 1900 + hk_data[current_idx] + current_idx += 1 + is_synced = hk_data[current_idx] + print(f"Time Sync Status: {is_synced}") + try: + dt = datetime( + year=time_year, + month=time_month, + day=time_day, + hour=time_hour, + minute=time_minutes, + second=time_seconds, + microsecond=time_ms * 1000, + ) + print(f"Time Now: {dt}") + except ValueError: + print( + f"Time: {time_day}.{time_month}.{time_year}T" + f"{time_hour}:{time_minutes}:{time_seconds}.{time_ms}" + )