add new action commands

This commit is contained in:
Robin Müller 2023-04-15 20:51:08 +02:00
parent d00e4247f6
commit bebde054f4
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC

View File

@ -23,7 +23,6 @@ _LOGGER = logging.getLogger(__name__)
class ActionId(enum.IntEnum):
LIST_DIR_INTO_FILE = 0
ANNOUNCE_VERSION = 1
ANNOUNCE_CURRENT_IMAGE = 2
ANNOUNCE_BOOT_COUNTS = 3
@ -42,6 +41,11 @@ class ActionId(enum.IntEnum):
EXECUTE_SHELL_CMD_BLOCKING = 40
EXECUTE_SHELL_CMD_NON_BLOCKING = 41
SYSTEMCTL_CMD_EXECUTOR = 42
LIST_DIR_INTO_FILE = 50
LIST_DIR_DUMP_DIRECTLY = 51
CP_HELPER = 52
MV_HELPER = 53
RM_HELPER = 54
class ParamId(enum.IntEnum):
@ -59,6 +63,11 @@ class OpCode:
EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking"
SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd"
LIST_DIR_INTO_FILE = "list_dir_into_file"
LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly"
CP_HELPER = "cp_helper"
MV_HELPER = "mv_helper"
RM_HELPER = "rm_helper"
SET_PREF_SD = "set_pref_sd"
REBOOT_XSC = ["reboot_xsc"]
XSC_REBOOT_SELF = ["reboot_self"]
@ -100,6 +109,11 @@ class Info:
SWITCH_TO_SD_0 = "Switch to SD card 0"
SWITCH_TO_SD_1 = "Switch to SD card 1"
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
LIST_DIR_INTO_FILE = "List directory, dump output into file"
LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly"
CP_HELPER = "Filesystem Copy Helper"
MV_HELPER = "Filesystem Move Helper"
RM_HELPER = "Filesystem Removal Helper"
class Chip(enum.IntEnum):
@ -184,6 +198,11 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0)
oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1)
oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS)
oce.add(keys=OpCode.LIST_DIR_INTO_FILE, info=Info.LIST_DIR_INTO_FILE)
oce.add(keys=OpCode.LIST_DIR_DUMP_DIRECTLY, info=Info.LIST_DIR_DUMP_DIRECTLY)
oce.add(keys=OpCode.MV_HELPER, info=Info.MV_HELPER)
oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER)
oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER)
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
@ -369,12 +388,86 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
).pack()
)
)
elif op_code == OpCode.CP_HELPER:
cp_recursive = int(input("Copy recursively (0/1) ?: "))
if cp_recursive not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
user_data = bytearray([cp_recursive])
user_data.extend(packet_source_dest_path("Copy"))
q.add_log_cmd(Info.CP_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data)
)
elif op_code == OpCode.MV_HELPER:
user_data = packet_source_dest_path("Move")
q.add_log_cmd(Info.MV_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data)
)
elif op_code == OpCode.RM_HELPER:
rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: "))
if rm_recursive not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
rm_force = int(input("Remove with force (-f) option (0/1) ?: "))
if rm_force not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
user_data = bytearray([rm_recursive, rm_force])
user_data.extend(packet_source_dest_path("Copy"))
q.add_log_cmd(Info.RM_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data)
)
elif op_code == OpCode.LIST_DIR_INTO_FILE:
q.add_log_cmd(Info.LIST_DIR_INTO_FILE)
user_data = list_directory_base_user_data()
dest_file_path = input("Destination file path: ")
user_data.extend(dest_file_path.encode())
user_data.append(0)
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data
)
)
elif op_code == OpCode.LIST_DIR_DUMP_DIRECTLY:
q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY)
user_data = list_directory_base_user_data()
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data
)
)
else:
_LOGGER.warning(
f"Unknown operation code {op_code} for core controller commands"
)
def list_directory_base_user_data() -> bytearray:
all_opt = int(input("Use all (-a) option (0/1) ?: "))
if all_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
recursive_opt = int(input("Use recursive (-R) option (0/1) ?: "))
if recursive_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
compression_opt = int(input("Compress target file (0/1) ?: "))
if compression_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
listing_path = input("Specify listing path (absolute path): ")
user_data = bytearray([all_opt, recursive_opt, compression_opt])
user_data.extend(listing_path.encode())
user_data.append(0)
def packet_source_dest_path(context: str) -> bytes:
source = input(f"{context} source file")
dest = input(f"{context} destination file")
raw_src_dest = bytearray(source.encode())
raw_src_dest.append(0)
raw_src_dest.extend(dest.encode())
raw_src_dest.append(0)
return raw_src_dest
def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
q.add_log_cmd(f"Resetting boot counter {chip} {copy}")
q.add_pus_tc(