import enum import logging import os import struct from pathlib import Path from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.config.definitions import CustomServiceList from spacepackets.ecss import PusTelecommand from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_20_fsfw_param import ( create_scalar_u8_parameter, create_load_param_cmd, ) from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter _LOGGER = logging.getLogger(__name__) class ActionId(enum.IntEnum): ANNOUNCE_VERSION = 1 ANNOUNCE_CURRENT_IMAGE = 2 ANNOUNCE_BOOT_COUNTS = 3 SWITCH_REBOOT_FILE_HANDLING = 5 RESET_REBOOT_COUNTER = 6 SWITCH_IMG_LOCK = 7 SET_MAX_REBOOT_CNT = 8 UPDATE_OBSW_FROM_SD_0 = 10 UPDATE_OBSW_FROM_SD_1 = 11 UPDATE_OBSW_FROM_TMP = 12 SWITCH_TO_SD_0 = 16 SWITCH_TO_SD_1 = 17 SWITCH_TO_BOTH_SD_CARDS = 18 XSC_REBOOT = 32 FULL_REBOOT = 34 EXECUTE_SHELL_CMD_BLOCKING = 40 EXECUTE_SHELL_CMD_NON_BLOCKING = 41 SYSTEMCTL_CMD_EXECUTOR = 42 LIST_DIR_INTO_FILE = 50 LIST_DIR_DUMP_DIRECTLY = 51 CP_HELPER = 52 MV_HELPER = 53 RM_HELPER = 54 MKDIR_HELPER = 55 class ParamId(enum.IntEnum): PREF_SD = 0 class SetId(enum.IntEnum): HK = 5 class OpCode: ANNOUNCE_VERSION = "announce_version" ANNOUNCE_CURRENT_IMAGE = "announce_current_image" ANNOUNCE_BOOT_COUNTS = "announce_boot_counts" EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking" EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking" SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd" LIST_DIR_INTO_FILE = "list_dir_into_file" LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly" CP_HELPER = "cp_helper" MV_HELPER = "mv_helper" RM_HELPER = "rm_helper" MKDIR_HELPER = "mkdir_helper" SET_PREF_SD = "set_pref_sd" REBOOT_XSC = ["reboot_xsc"] XSC_REBOOT_SELF = ["reboot_self"] XSC_REBOOT_0_0 = ["reboot_00"] XSC_REBOOT_0_1 = ["reboot_01"] XSC_REBOOT_1_0 = ["reboot_10"] XSC_REBOOT_1_1 = ["reboot_11"] REBOOT_FULL = ["reboot_regular"] GET_HK = ["get_hk"] OBSW_UPDATE_FROM_SD_0 = ["obsw_update_sd0"] OBSW_UPDATE_FROM_SD_1 = ["obsw_update_sd1"] OBSW_UPDATE_FROM_TMP = ["obsw_update_tmp"] SWITCH_TO_SD_0 = ["switch_to_sd_0"] SWITCH_TO_SD_1 = ["switch_to_sd_1"] SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"] ENABLE_REBOOT_FILE_HANDLING = ["rbh_off"] DISABLE_REBOOT_FILE_HANDLING = ["rbh_on"] RESET_ALL_REBOOT_COUNTERS = ["rbh_reset_a"] RESET_REBOOT_COUNTER_00 = ["rbh_reset_00"] RESET_REBOOT_COUNTER_01 = ["rbh_reset_01"] RESET_REBOOT_COUNTER_10 = ["rbh_reset_10"] RESET_REBOOT_COUNTER_11 = ["rbh_reset_11"] SET_MAX_REBOOT_CNT = ["rbh_max_cnt"] class Info: ANNOUNCE_VERSION = "Announce version" ANNOUNCE_CURRENT_IMAGE = "Announce current image" ANNOUNCE_BOOT_COUNTS = "Announce boot counts" SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command" EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking" EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking" SET_PREF_SD = "Set preferred SD card" REBOOT_XSC = "XSC reboot with prompt" REBOOT_FULL = "Full regular reboot" OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0" OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1" OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder" SWITCH_TO_SD_0 = "Switch to SD card 0" SWITCH_TO_SD_1 = "Switch to SD card 1" SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card" LIST_DIR_INTO_FILE = "List directory, dump output into file" LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly" CP_HELPER = "Filesystem Copy Helper" MV_HELPER = "Filesystem Move Helper" RM_HELPER = "Filesystem Removal Helper" MKDIR_HELPER = "Filesystem Directory Creation Helper" class Chip(enum.IntEnum): CHIP_0 = 0 CHIP_1 = 1 NONE = 2 class Copy(enum.IntEnum): COPY_0_NOM = 0 COPY_1_GOLD = 1 NONE = 2 class SystemctlCmd(enum.IntEnum): START = 0 STOP = 1 RESTART = 2 @tmtc_definitions_provider def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.ANNOUNCE_VERSION, info=Info.ANNOUNCE_VERSION) oce.add(keys=OpCode.ANNOUNCE_CURRENT_IMAGE, info=Info.ANNOUNCE_CURRENT_IMAGE) oce.add(keys=OpCode.ANNOUNCE_BOOT_COUNTS, info=Info.ANNOUNCE_BOOT_COUNTS) oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC) oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC) oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL) oce.add(keys=OpCode.XSC_REBOOT_SELF, info="Reboot Self") oce.add(keys=OpCode.XSC_REBOOT_0_0, info="Reboot 0 0") oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1") oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0") oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1") oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD) oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR) oce.add( keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING ) oce.add( keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING, info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING, ) oce.add( keys=OpCode.GET_HK, info="Request housekeeping set", ) oce.add( keys=OpCode.ENABLE_REBOOT_FILE_HANDLING, info="Enable reboot file handling", ) oce.add( keys=OpCode.DISABLE_REBOOT_FILE_HANDLING, info="Disable reboot file handling", ) oce.add( keys=OpCode.RESET_ALL_REBOOT_COUNTERS, info="Reset all reboot counters", ) oce.add( keys=OpCode.RESET_REBOOT_COUNTER_00, info="Reset reboot counter 0 0", ) oce.add( keys=OpCode.RESET_REBOOT_COUNTER_01, info="Reset reboot counter 0 1", ) oce.add( keys=OpCode.RESET_REBOOT_COUNTER_10, info="Reset reboot counter 1 0", ) oce.add( keys=OpCode.RESET_REBOOT_COUNTER_11, info="Reset reboot counter 1 1", ) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0) oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1) oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS) oce.add(keys=OpCode.LIST_DIR_INTO_FILE, info=Info.LIST_DIR_INTO_FILE) oce.add(keys=OpCode.LIST_DIR_DUMP_DIRECTLY, info=Info.LIST_DIR_DUMP_DIRECTLY) oce.add(keys=OpCode.MV_HELPER, info=Info.MV_HELPER) oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER) oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER) oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) def pack_core_commands(q: DefaultPusQueueHelper, op_code: str): if op_code == OpCode.ANNOUNCE_VERSION: q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}") q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION)) elif op_code == OpCode.ANNOUNCE_CURRENT_IMAGE: q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}") q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE) ) elif op_code == OpCode.ANNOUNCE_BOOT_COUNTS: q.add_log_cmd(f"{Info.ANNOUNCE_BOOT_COUNTS}") q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS) ) elif op_code in OpCode.REBOOT_XSC: reboot_self, chip_select, copy_select = determine_reboot_params() add_xsc_reboot_cmd( q=q, reboot_self=reboot_self, chip=chip_select, copy=copy_select, ) elif op_code in OpCode.REBOOT_FULL: q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT ) ) elif op_code in OpCode.XSC_REBOOT_SELF: add_xsc_reboot_cmd(q=q, reboot_self=True) elif op_code == OpCode.SYSTEMCTL_CMD_EXECUTOR: print("systemctl command types: ") for entry in SystemctlCmd: print(f"{entry}: {entry.name}") systemctl_cmd = SystemctlCmd( int(input("Specify systemctl command type by key: ")) ) unit_name = input("Specify unit name: ") cmd_data = bytearray([systemctl_cmd]) cmd_data.extend(unit_name.encode()) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR, user_data=cmd_data, ) ) elif op_code == OpCode.EXECUTE_SHELL_CMD_BLOCKING: custom_cmd = input("Please specify command to execute: ") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING, user_data=custom_cmd.encode(), ) ) elif op_code == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING: custom_cmd = input("Please specify command to execute: ") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING, user_data=custom_cmd.encode(), ) ) elif op_code in OpCode.XSC_REBOOT_0_0: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM ) elif op_code in OpCode.XSC_REBOOT_0_1: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_1_GOLD, ) elif op_code in OpCode.XSC_REBOOT_1_0: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM ) elif op_code in OpCode.XSC_REBOOT_1_1: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_1_GOLD, ) elif op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING: q.add_log_cmd("Disabling reboot file handling") user_data = bytearray([0]) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING, user_data=user_data, ) ) elif op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING: q.add_log_cmd("Enabling reboot file handling") user_data = bytearray([1]) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING, user_data=user_data, ) ) elif op_code in OpCode.RESET_ALL_REBOOT_COUNTERS: q.add_log_cmd("Resetting all reboot counters") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.RESET_REBOOT_COUNTER, ) ) elif op_code in OpCode.RESET_REBOOT_COUNTER_00: reset_specific_boot_counter(q, 0, 0) elif op_code in OpCode.RESET_REBOOT_COUNTER_01: reset_specific_boot_counter(q, 0, 1) elif op_code in OpCode.RESET_REBOOT_COUNTER_10: reset_specific_boot_counter(q, 1, 0) elif op_code in OpCode.RESET_REBOOT_COUNTER_11: reset_specific_boot_counter(q, 1, 1) elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0: q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0)) elif op_code in OpCode.OBSW_UPDATE_FROM_SD_1: q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1)) elif op_code in OpCode.OBSW_UPDATE_FROM_TMP: q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP)) elif op_code in OpCode.SWITCH_TO_SD_0: q.add_log_cmd(Info.SWITCH_TO_SD_0) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0 ) ) elif op_code in OpCode.SWITCH_TO_SD_1: q.add_log_cmd(Info.SWITCH_TO_SD_1) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1 ) ) elif op_code in OpCode.SWITCH_TO_BOTH_SD_CARDS: while True: active_sd_card = int(input("Please specify active SD card [0/1]: ")) if active_sd_card not in [0, 1]: _LOGGER.warning("Invalid SD card specified. Try again") break q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS, user_data=bytes([active_sd_card]), ) ) elif op_code in OpCode.GET_HK: q.add_log_cmd("Requesting housekeeping set") sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK) q.add_pus_tc(generate_one_hk_command(sid)) elif op_code in OpCode.SET_PREF_SD: q.add_log_cmd("Set preferred SD card") pref_sd = int( input("Specify which SD card to set as the preferred one (0/1): ") ) if pref_sd not in [0, 1]: raise ValueError("Only 0 or 1 allowed for preferred SD card") q.add_pus_tc( create_load_param_cmd( create_scalar_u8_parameter( object_id=CORE_CONTROLLER_ID, domain_id=0, unique_id=ParamId.PREF_SD, parameter=pref_sd, ).pack() ) ) elif op_code == OpCode.CP_HELPER: cp_recursive = int(input("Copy recursively (0/1) ?: ")) if cp_recursive not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") user_data = bytearray([cp_recursive]) user_data.extend(packet_source_dest_path("Copy")) q.add_log_cmd(Info.CP_HELPER) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data) ) elif op_code == OpCode.MV_HELPER: user_data = packet_source_dest_path("Move") q.add_log_cmd(Info.MV_HELPER) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data) ) elif op_code == OpCode.RM_HELPER: rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: ")) if rm_recursive not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") rm_force = int(input("Remove with force (-f) option (0/1) ?: ")) if rm_force not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") user_data = bytearray([rm_recursive, rm_force]) removed_file_or_dir = input("Specify absolute path to be removed: ") user_data.extend(removed_file_or_dir.encode()) user_data.append(0) q.add_log_cmd(Info.RM_HELPER) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data) ) elif op_code == OpCode.LIST_DIR_INTO_FILE: q.add_log_cmd(Info.LIST_DIR_INTO_FILE) user_data = list_directory_base_user_data() dest_file_path = input("Destination file path: ") user_data.extend(dest_file_path.encode()) user_data.append(0) q.add_pus_tc( create_action_cmd( CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data ) ) elif op_code == OpCode.LIST_DIR_DUMP_DIRECTLY: q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY) user_data = list_directory_base_user_data() q.add_pus_tc( create_action_cmd( CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data ) ) elif op_code == OpCode.MKDIR_HELPER: q.add_log_cmd(Info.MKDIR_HELPER) user_data = input("Specify absolute path of newly created directory: ") user_data = bytearray(user_data.encode()) user_data.append(0) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data) ) else: _LOGGER.warning( f"Unknown operation code {op_code} for core controller commands" ) def list_directory_base_user_data() -> bytearray: all_opt = int(input("Use all (-a) option (0/1) ?: ")) if all_opt not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") recursive_opt = int(input("Use recursive (-R) option (0/1) ?: ")) if recursive_opt not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") compression_opt = int(input("Compress target file (0/1) ?: ")) if compression_opt not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") listing_path = input("Specify listing path (absolute path): ") user_data = bytearray([all_opt, recursive_opt, compression_opt]) user_data.extend(listing_path.encode()) user_data.append(0) return user_data def packet_source_dest_path(context: str) -> bytes: source = input(f"Specify {context} source file: ") dest = input(f"Specify {context} destination file: ") raw_src_dest = bytearray(source.encode()) raw_src_dest.append(0) raw_src_dest.extend(dest.encode()) raw_src_dest.append(0) return raw_src_dest def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int): q.add_log_cmd(f"Resetting boot counter {chip} {copy}") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.RESET_REBOOT_COUNTER, user_data=bytes([chip, copy]), ) ) def create_full_reboot_cmds() -> PusTelecommand: return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT ) def determine_reboot_params() -> (bool, Chip, Copy): chip_select = -1 copy_select = -1 reboot_self = input("Reboot self? [y/n]: ") if reboot_self in ["y", "yes", "1"]: _LOGGER.info("Rebooting currently running image") return True, chip_select, copy_select _LOGGER.info("Rebooting image specified by chip and copy") return False, determine_chip_and_copy() def determine_chip_and_copy() -> (int, int): while True: chip_select = input("Chip select [0/1]: ") if chip_select in ["0", "1"]: if chip_select == "0": chip_select = Chip.CHIP_0 else: chip_select = Chip.CHIP_1 break else: _LOGGER.warning("Invalid chip select value. Try again") while True: copy_select = input("Copy select [0/1]: ") if copy_select in ["0", "1"]: if copy_select == "0": copy_select = Copy.COPY_0_NOM else: copy_select = Copy.COPY_1_GOLD break else: _LOGGER.warning("Invalid copy select value. Try again") return chip_select, copy_select def pack_obsw_update_cmd(action_id: int) -> PusTelecommand: chip, copy = determine_chip_and_copy() user_data = bytearray([chip, copy]) custom_file_name = input("Use custom filename [y/n] ?: ") if custom_file_name.lower() in ["y", "yes", "1"]: custom_file_name = input("Specify custom filename: ") user_data.extend(custom_file_name.encode()) return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data ) def add_xsc_reboot_cmd( q: DefaultPusQueueHelper, reboot_self: bool, chip: Chip = Chip.NONE, copy: Copy = Copy.NONE, ): if reboot_self: q.add_log_cmd("Packing reboot command for current image") else: q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}") q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy)) def create_xsc_reboot_cmds( reboot_self: bool, chip: Chip = Chip.NONE, copy: Copy = Copy.NONE, ) -> PusTelecommand: tc_data = bytearray() if reboot_self: tc_data.append(True) else: tc_data.append(False) tc_data.append(chip) tc_data.append(copy) return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data ) def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): if set_id == SetId.HK: pw = PrintWrapper(printer) fmt_str = "!fff" inc_len = struct.calcsize(fmt_str) (temperature, ps_voltage, pl_voltage) = struct.unpack( fmt_str, hk_data[0 : 0 + inc_len] ) printout = ( f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " f"PL Voltage [mV] {pl_voltage}" ) pw.dlog(printout) printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3) def handle_core_ctrl_action_replies( action_id: int, printer: FsfwTmTcPrinter, custom_data: bytes ): pw = PrintWrapper(printer) if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY: if len(custom_data) < 4: _LOGGER.warning("Data unexpectedly small") return seq_idx = struct.unpack("!I", custom_data[0:4])[0] total_chunks = struct.unpack("!I", custom_data[4:8])[0] compressed = custom_data[8] ls_cmd = custom_data[9:].split(b"\x00")[0].decode() # Include length of NULL termination file_data_offset = 9 + len(ls_cmd) + 1 pw.dlog( f"Received directory listing dump for ls command {ls_cmd}. " f"Chunk {seq_idx + 1}/{total_chunks}" ) def remove_if_exists_and_new(seq_idx_: int, path_: Path): if seq_idx_ == 0 and path_.exists(): os.remove(path_) if compressed: path = Path("dir_listing.txt.gz") remove_if_exists_and_new(seq_idx, path) pw.dlog( f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz" ) with open(path, "ab") as listing_file: listing_file.write(custom_data[file_data_offset:]) else: path = Path("dir_listing.txt") remove_if_exists_and_new(seq_idx, path) pw.dlog( f"Compression option: {compressed}. Dumping file into dir_listing.txt" ) with open(path, "a") as listing_file: listing_file_str = custom_data[file_data_offset:].decode() listing_file.write(listing_file_str) if seq_idx + 1 == total_chunks: pw.dlog("Full directory listing: ") with open("dir_listing.txt", "r") as listing_file: print(listing_file.read())