import enum import logging import os import struct from pathlib import Path from typing import Tuple from spacepackets.ecss import PusTelecommand from tmtccmd.config import CmdTreeNode, TmtcDefinitionWrapper from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.tc.s3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.pus.s20_fsfw_param import ( create_scalar_u8_parameter, create_load_param_cmd, ) from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.pus.s11_tc_sched import ( create_enable_tc_sched_cmd, create_disable_tc_sched_cmd, ) from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID from eive_tmtc.pus_tm.defs import PrintWrapper _LOGGER = logging.getLogger(__name__) class SdState(enum.IntEnum): OFF = 0 ON = 1 MOUNTED = 2 class SdCardSelect(enum.IntEnum): SD_0 = 0 SD_1 = 1 BOTH = 2 NONE = 3 class ActionId(enum.IntEnum): ANNOUNCE_VERSION = 1 ANNOUNCE_CURRENT_IMAGE = 2 ANNOUNCE_BOOT_COUNTS = 3 SWITCH_REBOOT_FILE_HANDLING = 5 RESET_REBOOT_COUNTER = 6 SWITCH_IMG_LOCK = 7 SET_MAX_REBOOT_CNT = 8 READ_REBOOT_MECHANISM_INFO = 9 UPDATE_OBSW_FROM_SD_0 = 10 UPDATE_OBSW_FROM_SD_1 = 11 UPDATE_OBSW_FROM_TMP = 12 SWITCH_TO_SD_0 = 16 SWITCH_TO_SD_1 = 17 SWITCH_TO_BOTH_SD_CARDS = 18 AUTO_SWITCH_ENABLE = 19 AUTO_SWITCH_DISABLE = 20 XSC_REBOOT = 32 FULL_REBOOT = 34 EXECUTE_SHELL_CMD_BLOCKING = 40 EXECUTE_SHELL_CMD_NON_BLOCKING = 41 SYSTEMCTL_CMD_EXECUTOR = 42 LIST_DIR_INTO_FILE = 50 LIST_DIR_DUMP_DIRECTLY = 51 CP_HELPER = 52 MV_HELPER = 53 RM_HELPER = 54 MKDIR_HELPER = 55 ENABLE_SCHEDULER = 56 class ParamId(enum.IntEnum): PREF_SD = 0 class SetId(enum.IntEnum): HK = 5 class OpCode: ANNOUNCE_VERSION = "announce_version" ANNOUNCE_CURRENT_IMAGE = "announce_current_image" ANNOUNCE_BOOT_COUNTS = "announce_boot_counts" EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking" EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking" SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd" LIST_DIR_INTO_FILE = "list_dir_into_file" LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly" CP_HELPER = "cp_helper" MV_HELPER = "mv_helper" RM_HELPER = "rm_helper" MKDIR_HELPER = "mkdir_helper" SET_PREF_SD = "set_pref_sd" REBOOT_XSC = "reboot_xsc" XSC_REBOOT_SELF = "reboot_self" XSC_REBOOT_0_0 = "reboot_00" XSC_REBOOT_0_1 = "reboot_01" XSC_REBOOT_1_0 = "reboot_10" XSC_REBOOT_1_1 = "reboot_11" REBOOT_FULL = "reboot_regular" GET_HK = "get_hk" OBSW_UPDATE_FROM_SD_0 = "obsw_update_sd0" OBSW_UPDATE_FROM_SD_1 = "obsw_update_sd1" OBSW_UPDATE_FROM_TMP = "obsw_update_tmp" SWITCH_TO_SD_0 = "switch_to_sd_0" SWITCH_TO_SD_1 = "switch_to_sd_1" SWITCH_TO_BOTH_SD_CARDS = "switch_to_both_sd_cards" READ_REBOOT_MECHANISM_INFO = "rbh_info" ENABLE_REBOOT_FILE_HANDLING = "rwd_on" DISABLE_REBOOT_FILE_HANDLING = "rwd_off" RESET_ALL_REBOOT_COUNTERS = "rwd_reset_a" RWD_RESET_REBOOT_COUNTER_00 = "rwd_reset_00" RWD_RESET_REBOOT_COUNTER_01 = "rwd_reset_01" RWD_RESET_REBOOT_COUNTER_10 = "rwd_reset_10" RWD_RESET_REBOOT_COUNTER_11 = "rwd_reset_11" RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt" AUTO_SWITCH_ENABLE = "auto_switch_enable" AUTO_SWITCH_DISABLE = "auto_switch_disable" ENABLE_SCHEDULER = "enable_scheduler" DISABLE_SCHEDULER = "disable_scheduler" class Info: ANNOUNCE_VERSION = "Announce version" ANNOUNCE_CURRENT_IMAGE = "Announce current image" ANNOUNCE_BOOT_COUNTS = "Announce boot counts" SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command" EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking" EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking" SET_PREF_SD = "Set preferred SD card" REBOOT_XSC = "XSC reboot with prompt" REBOOT_FULL = "Full regular reboot" XSC_REBOOT_SELF = "Reboot Self" XSC_REBOOT_0_0 = "Reboot to 0 0" XSC_REBOOT_0_1 = "Reboot to 0 1" XSC_REBOOT_1_0 = "Reboot to 1 0" XSC_REBOOT_1_1 = "Reboot to 1 1" OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0" OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1" OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder" READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information" SWITCH_TO_SD_0 = "Switch to SD card 0" SWITCH_TO_SD_1 = "Switch to SD card 1" SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card" LIST_DIR_INTO_FILE = "List directory, dump output into file" LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly" CP_HELPER = "Filesystem Copy Helper" MV_HELPER = "Filesystem Move Helper" RM_HELPER = "Filesystem Removal Helper" MKDIR_HELPER = "Filesystem Directory Creation Helper" ENABLE_REBOOT_FILE_HANDLING = "Enable reboot file handling" DISABLE_REBOOT_FILE_HANDLING = "Disable reboot file handling" RESET_ALL_REBOOT_COUNTERS = "Reset all reboot counters" RWD_RESET_REBOOT_COUNTER_00 = "Reset reboot counter 0 0" RWD_RESET_REBOOT_COUNTER_01 = "Reset reboot counter 0 0" RWD_RESET_REBOOT_COUNTER_10 = "Reset reboot counter 1 0" GET_HK = "Get HK set" RWD_RESET_REBOOT_COUNTER_11 = "Reset reboot counter 1 1" RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt" AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image" AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature" ENABLE_SCHEDULER = "Enable scheduler" DISABLE_SCHEDULER = "Disable scheduler" class Chip(enum.IntEnum): CHIP_0 = 0 CHIP_1 = 1 NONE = 2 class Copy(enum.IntEnum): COPY_0_NOM = 0 COPY_1_GOLD = 1 NONE = 2 class SystemctlCmd(enum.IntEnum): START = 0 STOP = 1 RESTART = 2 def create_core_node() -> CmdTreeNode: op_code_strs = [ getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__") ] info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] combined_dict = dict(zip(op_code_strs, info_strs)) node = CmdTreeNode("core", "Core Controller", hide_children_for_print=True) for op_code, info in combined_dict.items(): node.add_child(CmdTreeNode(op_code, info)) return node @tmtc_definitions_provider def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=OpCode.ANNOUNCE_VERSION, info=Info.ANNOUNCE_VERSION) oce.add(keys=OpCode.ANNOUNCE_CURRENT_IMAGE, info=Info.ANNOUNCE_CURRENT_IMAGE) oce.add(keys=OpCode.ANNOUNCE_BOOT_COUNTS, info=Info.ANNOUNCE_BOOT_COUNTS) oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC) oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC) oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL) oce.add(keys=OpCode.XSC_REBOOT_SELF, info="Reboot Self") oce.add(keys=OpCode.XSC_REBOOT_0_0, info="Reboot 0 0") oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1") oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0") oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1") oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD) oce.add( keys=OpCode.READ_REBOOT_MECHANISM_INFO, info=Info.READ_REBOOT_MECHANISM_INFO ) oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.AUTO_SWITCH_ENABLE, info=Info.AUTO_SWITCH_ENABLE) oce.add(keys=OpCode.AUTO_SWITCH_DISABLE, info=Info.AUTO_SWITCH_DISABLE) oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR) oce.add( keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING ) oce.add( keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING, info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING, ) oce.add( keys=OpCode.GET_HK, info="Request housekeeping set", ) oce.add( keys=OpCode.ENABLE_REBOOT_FILE_HANDLING, info="Enable reboot file handling", ) oce.add( keys=OpCode.DISABLE_REBOOT_FILE_HANDLING, info="Disable reboot file handling", ) oce.add( keys=OpCode.RESET_ALL_REBOOT_COUNTERS, info="Reset all reboot counters", ) oce.add( keys=OpCode.RWD_RESET_REBOOT_COUNTER_00, info="Reset reboot counter 0 0", ) oce.add( keys=OpCode.RWD_RESET_REBOOT_COUNTER_01, info="Reset reboot counter 0 1", ) oce.add( keys=OpCode.RWD_RESET_REBOOT_COUNTER_10, info="Reset reboot counter 1 0", ) oce.add( keys=OpCode.RWD_RESET_REBOOT_COUNTER_11, info="Reset reboot counter 1 1", ) oce.add( keys=OpCode.RWD_SET_MAX_REBOOT_CNT, info="Reset max reboot count for reboot watchdog", ) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0) oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1) oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS) oce.add(keys=OpCode.LIST_DIR_INTO_FILE, info=Info.LIST_DIR_INTO_FILE) oce.add(keys=OpCode.LIST_DIR_DUMP_DIRECTLY, info=Info.LIST_DIR_DUMP_DIRECTLY) oce.add(keys=OpCode.MV_HELPER, info=Info.MV_HELPER) oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER) oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER) oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER) oce.add(keys=OpCode.ENABLE_SCHEDULER, info=Info.ENABLE_SCHEDULER) oce.add(keys=OpCode.DISABLE_SCHEDULER, info=Info.DISABLE_SCHEDULER) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) def pack_core_commands( # noqa C901 q: DefaultPusQueueHelper, cmd_str: str ): # noqa: C901 , complexity okay here if cmd_str == OpCode.ANNOUNCE_VERSION: q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}") q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION)) elif cmd_str == OpCode.ANNOUNCE_CURRENT_IMAGE: q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}") q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE) ) elif cmd_str == OpCode.ANNOUNCE_BOOT_COUNTS: q.add_log_cmd(f"{Info.ANNOUNCE_BOOT_COUNTS}") q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS) ) elif cmd_str == OpCode.REBOOT_XSC: reboot_self, chip_select, copy_select = determine_reboot_params() add_xsc_reboot_cmd( q=q, reboot_self=reboot_self, chip=chip_select, copy=copy_select, ) elif cmd_str == OpCode.REBOOT_FULL: q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT ) ) elif cmd_str == OpCode.XSC_REBOOT_SELF: add_xsc_reboot_cmd(q=q, reboot_self=True) elif cmd_str == OpCode.SYSTEMCTL_CMD_EXECUTOR: print("systemctl command types: ") for entry in SystemctlCmd: print(f"{entry}: {entry.name}") systemctl_cmd = SystemctlCmd( int(input("Specify systemctl command type by key: ")) ) unit_name = input("Specify unit name: ") q.add_pus_tc(create_systemctl_cmd(systemctl_cmd, unit_name)) elif cmd_str == OpCode.EXECUTE_SHELL_CMD_BLOCKING: custom_cmd = input("Please specify command to execute: ") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING, user_data=custom_cmd.encode(), ) ) elif cmd_str == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING: custom_cmd = input("Please specify command to execute: ") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING, user_data=custom_cmd.encode(), ) ) elif cmd_str == OpCode.XSC_REBOOT_0_0: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM ) elif cmd_str == OpCode.XSC_REBOOT_0_1: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_1_GOLD, ) elif cmd_str == OpCode.XSC_REBOOT_1_0: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM ) elif cmd_str == OpCode.XSC_REBOOT_1_1: add_xsc_reboot_cmd( q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_1_GOLD, ) elif cmd_str == OpCode.READ_REBOOT_MECHANISM_INFO: q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.READ_REBOOT_MECHANISM_INFO, ) ) elif cmd_str == OpCode.DISABLE_REBOOT_FILE_HANDLING: q.add_log_cmd("Disabling reboot file handling") user_data = bytearray([0]) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING, user_data=user_data, ) ) elif cmd_str == OpCode.ENABLE_REBOOT_FILE_HANDLING: q.add_log_cmd("Enabling reboot file handling") user_data = bytearray([1]) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING, user_data=user_data, ) ) elif cmd_str == OpCode.RESET_ALL_REBOOT_COUNTERS: q.add_log_cmd("Resetting all reboot counters") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.RESET_REBOOT_COUNTER, ) ) elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_00: reset_specific_boot_counter(q, 0, 0) elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_01: reset_specific_boot_counter(q, 0, 1) elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_10: reset_specific_boot_counter(q, 1, 0) elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_11: reset_specific_boot_counter(q, 1, 1) elif cmd_str == OpCode.RWD_SET_MAX_REBOOT_CNT: max_count = int(input("Set new maximum reboot threshold [1, 50]: ")) if max_count < 1 or max_count > 50: raise ValueError("Invalid value, must be in range 1 to 50") q.add_pus_tc( create_action_cmd( CORE_CONTROLLER_ID, ActionId.SET_MAX_REBOOT_CNT, user_data=bytes([max_count]), ) ) elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_0: q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0)) elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_1: q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1)) elif cmd_str == OpCode.OBSW_UPDATE_FROM_TMP: q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP)) elif cmd_str == OpCode.AUTO_SWITCH_ENABLE: q.add_log_cmd(Info.AUTO_SWITCH_ENABLE) chip, copy = determine_chip_and_copy() user_data = bytes([chip, copy]) q.add_pus_tc( create_action_cmd( CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_ENABLE, user_data ) ) elif cmd_str == OpCode.AUTO_SWITCH_DISABLE: q.add_log_cmd(Info.AUTO_SWITCH_DISABLE) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_DISABLE) ) elif cmd_str == OpCode.SWITCH_TO_SD_0: q.add_log_cmd(Info.SWITCH_TO_SD_0) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0 ) ) elif cmd_str == OpCode.SWITCH_TO_SD_1: q.add_log_cmd(Info.SWITCH_TO_SD_1) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1 ) ) elif cmd_str == OpCode.SWITCH_TO_BOTH_SD_CARDS: while True: active_sd_card = int(input("Please specify active SD card [0/1]: ")) if active_sd_card not in [0, 1]: _LOGGER.warning("Invalid SD card specified. Try again") break q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS) q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS, user_data=bytes([active_sd_card]), ) ) elif cmd_str == OpCode.GET_HK: q.add_log_cmd("Requesting housekeeping set") sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK) q.add_pus_tc(generate_one_hk_command(sid)) elif cmd_str == OpCode.SET_PREF_SD: q.add_log_cmd("Set preferred SD card") pref_sd = int( input("Specify which SD card to set as the preferred one (0/1): ") ) if pref_sd not in [0, 1]: raise ValueError("Only 0 or 1 allowed for preferred SD card") q.add_pus_tc( create_load_param_cmd( create_scalar_u8_parameter( object_id=CORE_CONTROLLER_ID, domain_id=0, unique_id=ParamId.PREF_SD, parameter=pref_sd, ) ) ) elif cmd_str == OpCode.CP_HELPER: cp_recursive = int(input("Copy recursively (0/1) ?: ")) if cp_recursive not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") cp_force = int(input("Copy with force option(0/1) ?: ")) if cp_force not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") user_data = bytearray([cp_recursive, cp_force]) user_data.extend(packet_source_dest_path("Copy")) q.add_log_cmd(Info.CP_HELPER) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data) ) elif cmd_str == OpCode.MV_HELPER: user_data = packet_source_dest_path("Move") q.add_log_cmd(Info.MV_HELPER) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data) ) elif cmd_str == OpCode.RM_HELPER: rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: ")) if rm_recursive not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") rm_force = int(input("Remove with force (-f) option (0/1) ?: ")) if rm_force not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") user_data = bytearray([rm_recursive, rm_force]) removed_file_or_dir = input("Specify absolute path to be removed: ") user_data.extend(removed_file_or_dir.encode()) user_data.append(0) q.add_log_cmd(Info.RM_HELPER) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data) ) elif cmd_str == OpCode.LIST_DIR_INTO_FILE: q.add_log_cmd(Info.LIST_DIR_INTO_FILE) user_data = list_directory_base_user_data() dest_file_path = input("Destination file path: ") user_data.extend(dest_file_path.encode()) user_data.append(0) q.add_pus_tc( create_action_cmd( CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data ) ) elif cmd_str == OpCode.LIST_DIR_DUMP_DIRECTLY: q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY) user_data = list_directory_base_user_data() q.add_pus_tc( create_action_cmd( CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data ) ) elif cmd_str == OpCode.MKDIR_HELPER: q.add_log_cmd(Info.MKDIR_HELPER) user_data = input("Specify absolute path of newly created directory: ") user_data = bytearray(user_data.encode()) user_data.append(0) q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data) ) elif cmd_str == OpCode.ENABLE_SCHEDULER: q.add_log_cmd(Info.ENABLE_SCHEDULER) q.add_pus_tc(create_enable_tc_sched_cmd()) elif cmd_str == OpCode.DISABLE_SCHEDULER: q.add_log_cmd(Info.DISABLE_SCHEDULER) q.add_pus_tc(create_disable_tc_sched_cmd()) else: _LOGGER.warning( f"Unknown operation code {cmd_str} for core controller commands" ) def create_systemctl_cmd(systemctl_cmd: SystemctlCmd, unit_name: str): cmd_data = bytearray([systemctl_cmd]) cmd_data.extend(unit_name.encode()) return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR, user_data=cmd_data, ) def list_directory_base_user_data() -> bytearray: all_opt = int(input("Use all (-a) option (0/1) ?: ")) if all_opt not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") recursive_opt = int(input("Use recursive (-R) option (0/1) ?: ")) if recursive_opt not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") compression_opt = int(input("Compress target file (0/1) ?: ")) if compression_opt not in [0, 1]: raise ValueError("Invalid value, only 0 or 1 allowed") listing_path = input("Specify listing path (absolute path): ") user_data = bytearray([all_opt, recursive_opt, compression_opt]) user_data.extend(listing_path.encode()) user_data.append(0) return user_data def packet_source_dest_path(context: str) -> bytes: source = input(f"Specify {context} source file: ") dest = input(f"Specify {context} destination file: ") raw_src_dest = bytearray(source.encode()) raw_src_dest.append(0) raw_src_dest.extend(dest.encode()) raw_src_dest.append(0) return raw_src_dest def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int): q.add_log_cmd(f"Resetting boot counter {chip} {copy}") q.add_pus_tc( create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.RESET_REBOOT_COUNTER, user_data=bytes([chip, copy]), ) ) def create_full_reboot_cmds() -> PusTelecommand: return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT ) def determine_reboot_params() -> Tuple[bool, Chip, Copy]: reboot_self = input("Reboot self? [y/n]: ") if reboot_self in ["y", "yes", "1"]: _LOGGER.info("Rebooting currently running image") return True, Chip.NONE, Copy.NONE _LOGGER.info("Rebooting image specified by chip and copy") chip, copy = determine_chip_and_copy() return False, chip, copy def determine_chip_and_copy() -> Tuple[Chip, Copy]: while True: chip_select = input("Chip select [0/1]: ") if chip_select in ["0", "1"]: if chip_select == "0": chip_select = Chip.CHIP_0 else: chip_select = Chip.CHIP_1 break else: _LOGGER.warning("Invalid chip select value. Try again") while True: copy_select = input("Copy select [0/1]: ") if copy_select in ["0", "1"]: if copy_select == "0": copy_select = Copy.COPY_0_NOM else: copy_select = Copy.COPY_1_GOLD break else: _LOGGER.warning("Invalid copy select value. Try again") return chip_select, copy_select def pack_obsw_update_cmd(action_id: int) -> PusTelecommand: chip, copy = determine_chip_and_copy() user_data = bytearray([chip, copy]) custom_file_name = input("Use custom filename [y/n] ?: ") if custom_file_name.lower() in ["y", "yes", "1"]: custom_file_name = input("Specify custom filename: ") user_data.extend(custom_file_name.encode()) return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data ) def add_xsc_reboot_cmd( q: DefaultPusQueueHelper, reboot_self: bool, chip: Chip = Chip.NONE, copy: Copy = Copy.NONE, ): if reboot_self: q.add_log_cmd("Packing reboot command for current image") else: q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}") q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy)) def create_xsc_reboot_cmds( reboot_self: bool, chip: Chip = Chip.NONE, copy: Copy = Copy.NONE, ) -> PusTelecommand: tc_data = bytearray() if reboot_self: tc_data.append(True) else: tc_data.append(False) tc_data.append(chip) tc_data.append(copy) return create_action_cmd( object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data ) def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes): if set_id == SetId.HK: fmt_str = "!fff" inc_len = struct.calcsize(fmt_str) (temperature, ps_voltage, pl_voltage) = struct.unpack( fmt_str, hk_data[0 : 0 + inc_len] ) printout = ( f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " f"PL Voltage [mV] {pl_voltage}" ) pw.dlog(printout) FsfwTmTcPrinter.get_validity_buffer( validity_buffer=hk_data[inc_len:], num_vars=3 ) def handle_core_ctrl_action_replies( action_id: int, pw: PrintWrapper, custom_data: bytes ): if action_id == ActionId.READ_REBOOT_MECHANISM_INFO: handle_reboot_mechanism_info_reply(pw, custom_data) elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY: handle_list_dir_dump_reply(pw, custom_data) def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes): pw.dlog("Received reboot mechansm information") fmt_str = "!BIIIIIBBBBBBBB" inc_len = struct.calcsize(fmt_str) if len(custom_data) < inc_len: raise ValueError(f"Received custom data shorter than expected {inc_len}") ( enabled, max_count, img00_count, img01_count, img10_count, img11_count, img00_lock, img01_lock, img10_lock, img11_lock, last_chip, last_copy, next_chip, next_copy, ) = struct.unpack(fmt_str, custom_data[:inc_len]) pw.dlog(f"Enabled: {enabled}") pw.dlog(f"Max Count: {max_count}") pw.dlog(f"Count 00: {img00_count}") pw.dlog(f"Count 01: {img01_count}") pw.dlog(f"Count 10: {img10_count}") pw.dlog(f"Count 11: {img11_count}") pw.dlog(f"Lock 00: {img00_lock}") pw.dlog(f"Lock 01: {img01_lock}") pw.dlog(f"Lock 10: {img10_lock}") pw.dlog(f"Lock 11: {img11_lock}") pw.dlog(f"Last Chip: {last_chip}") pw.dlog(f"Last Copy: {last_copy}") pw.dlog(f"Next Chip: {next_chip}") pw.dlog(f"Next Copy: {next_copy}") def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes): if len(custom_data) < 4: _LOGGER.warning("Data unexpectedly small") return seq_idx = struct.unpack("!I", custom_data[0:4])[0] total_chunks = struct.unpack("!I", custom_data[4:8])[0] compressed = custom_data[8] ls_cmd = custom_data[9:].split(b"\x00")[0].decode() # Include length of NULL termination file_data_offset = 9 + len(ls_cmd) + 1 pw.dlog( f"Received directory listing dump for ls command {ls_cmd}. " f"Chunk {seq_idx + 1}/{total_chunks}" ) def remove_if_exists_and_new(seq_idx_: int, path_: Path): if seq_idx_ == 0 and path_.exists(): os.remove(path_) if compressed: path = Path("dir_listing.txt.gz") remove_if_exists_and_new(seq_idx, path) pw.dlog( f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz" ) with open(path, "ab") as listing_file: listing_file.write(custom_data[file_data_offset:]) else: path = Path("dir_listing.txt") remove_if_exists_and_new(seq_idx, path) pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt") with open(path, "a") as listing_file: listing_file_str = custom_data[file_data_offset:].decode() listing_file.write(listing_file_str) if seq_idx + 1 == total_chunks: pw.dlog("Full directory listing: ") with open("dir_listing.txt", "r") as listing_file: print(listing_file.read())