diff --git a/eive_tmtc/tmtc/core.py b/eive_tmtc/tmtc/core.py index 9d96b5c..20bba82 100644 --- a/eive_tmtc/tmtc/core.py +++ b/eive_tmtc/tmtc/core.py @@ -23,7 +23,6 @@ _LOGGER = logging.getLogger(__name__) class ActionId(enum.IntEnum): - LIST_DIR_INTO_FILE = 0 ANNOUNCE_VERSION = 1 ANNOUNCE_CURRENT_IMAGE = 2 ANNOUNCE_BOOT_COUNTS = 3 @@ -42,6 +41,11 @@ class ActionId(enum.IntEnum): EXECUTE_SHELL_CMD_BLOCKING = 40 EXECUTE_SHELL_CMD_NON_BLOCKING = 41 SYSTEMCTL_CMD_EXECUTOR = 42 + LIST_DIR_INTO_FILE = 50 + LIST_DIR_DUMP_DIRECTLY = 51 + CP_HELPER = 52 + MV_HELPER = 53 + RM_HELPER = 54 class ParamId(enum.IntEnum): @@ -59,6 +63,11 @@ class OpCode: EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking" EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking" SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd" + LIST_DIR_INTO_FILE = "list_dir_into_file" + LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly" + CP_HELPER = "cp_helper" + MV_HELPER = "mv_helper" + RM_HELPER = "rm_helper" SET_PREF_SD = "set_pref_sd" REBOOT_XSC = ["reboot_xsc"] XSC_REBOOT_SELF = ["reboot_self"] @@ -100,6 +109,11 @@ class Info: SWITCH_TO_SD_0 = "Switch to SD card 0" SWITCH_TO_SD_1 = "Switch to SD card 1" SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card" + LIST_DIR_INTO_FILE = "List directory, dump output into file" + LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly" + CP_HELPER = "Filesystem Copy Helper" + MV_HELPER = "Filesystem Move Helper" + RM_HELPER = "Filesystem Removal Helper" class Chip(enum.IntEnum): @@ -184,6 +198,11 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0) oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1) oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS) + oce.add(keys=OpCode.LIST_DIR_INTO_FILE, info=Info.LIST_DIR_INTO_FILE) + oce.add(keys=OpCode.LIST_DIR_DUMP_DIRECTLY, info=Info.LIST_DIR_DUMP_DIRECTLY) + oce.add(keys=OpCode.MV_HELPER, info=Info.MV_HELPER) + oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER) + oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) @@ -369,12 +388,86 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str): ).pack() ) ) + elif op_code == OpCode.CP_HELPER: + cp_recursive = int(input("Copy recursively (0/1) ?: ")) + if cp_recursive not in [0, 1]: + raise ValueError("Invalid value, only 0 or 1 allowed") + user_data = bytearray([cp_recursive]) + user_data.extend(packet_source_dest_path("Copy")) + q.add_log_cmd(Info.CP_HELPER) + q.add_pus_tc( + create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data) + ) + elif op_code == OpCode.MV_HELPER: + user_data = packet_source_dest_path("Move") + q.add_log_cmd(Info.MV_HELPER) + q.add_pus_tc( + create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data) + ) + elif op_code == OpCode.RM_HELPER: + rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: ")) + if rm_recursive not in [0, 1]: + raise ValueError("Invalid value, only 0 or 1 allowed") + rm_force = int(input("Remove with force (-f) option (0/1) ?: ")) + if rm_force not in [0, 1]: + raise ValueError("Invalid value, only 0 or 1 allowed") + user_data = bytearray([rm_recursive, rm_force]) + user_data.extend(packet_source_dest_path("Copy")) + q.add_log_cmd(Info.RM_HELPER) + q.add_pus_tc( + create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data) + ) + elif op_code == OpCode.LIST_DIR_INTO_FILE: + q.add_log_cmd(Info.LIST_DIR_INTO_FILE) + user_data = list_directory_base_user_data() + dest_file_path = input("Destination file path: ") + user_data.extend(dest_file_path.encode()) + user_data.append(0) + q.add_pus_tc( + create_action_cmd( + CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data + ) + ) + elif op_code == OpCode.LIST_DIR_DUMP_DIRECTLY: + q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY) + user_data = list_directory_base_user_data() + q.add_pus_tc( + create_action_cmd( + CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data + ) + ) else: _LOGGER.warning( f"Unknown operation code {op_code} for core controller commands" ) +def list_directory_base_user_data() -> bytearray: + all_opt = int(input("Use all (-a) option (0/1) ?: ")) + if all_opt not in [0, 1]: + raise ValueError("Invalid value, only 0 or 1 allowed") + recursive_opt = int(input("Use recursive (-R) option (0/1) ?: ")) + if recursive_opt not in [0, 1]: + raise ValueError("Invalid value, only 0 or 1 allowed") + compression_opt = int(input("Compress target file (0/1) ?: ")) + if compression_opt not in [0, 1]: + raise ValueError("Invalid value, only 0 or 1 allowed") + listing_path = input("Specify listing path (absolute path): ") + user_data = bytearray([all_opt, recursive_opt, compression_opt]) + user_data.extend(listing_path.encode()) + user_data.append(0) + + +def packet_source_dest_path(context: str) -> bytes: + source = input(f"{context} source file") + dest = input(f"{context} destination file") + raw_src_dest = bytearray(source.encode()) + raw_src_dest.append(0) + raw_src_dest.extend(dest.encode()) + raw_src_dest.append(0) + return raw_src_dest + + def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int): q.add_log_cmd(f"Resetting boot counter {chip} {copy}") q.add_pus_tc(