|
|
|
@@ -1,6 +1,8 @@
|
|
|
|
|
import enum
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import struct
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
|
|
|
|
|
|
|
|
@@ -46,6 +48,7 @@ class ActionId(enum.IntEnum):
|
|
|
|
|
CP_HELPER = 52
|
|
|
|
|
MV_HELPER = 53
|
|
|
|
|
RM_HELPER = 54
|
|
|
|
|
MKDIR_HELPER = 55
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ParamId(enum.IntEnum):
|
|
|
|
@@ -68,6 +71,7 @@ class OpCode:
|
|
|
|
|
CP_HELPER = "cp_helper"
|
|
|
|
|
MV_HELPER = "mv_helper"
|
|
|
|
|
RM_HELPER = "rm_helper"
|
|
|
|
|
MKDIR_HELPER = "mkdir_helper"
|
|
|
|
|
SET_PREF_SD = "set_pref_sd"
|
|
|
|
|
REBOOT_XSC = ["reboot_xsc"]
|
|
|
|
|
XSC_REBOOT_SELF = ["reboot_self"]
|
|
|
|
@@ -114,6 +118,7 @@ class Info:
|
|
|
|
|
CP_HELPER = "Filesystem Copy Helper"
|
|
|
|
|
MV_HELPER = "Filesystem Move Helper"
|
|
|
|
|
RM_HELPER = "Filesystem Removal Helper"
|
|
|
|
|
MKDIR_HELPER = "Filesystem Directory Creation Helper"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Chip(enum.IntEnum):
|
|
|
|
@@ -203,6 +208,7 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
|
|
|
|
oce.add(keys=OpCode.MV_HELPER, info=Info.MV_HELPER)
|
|
|
|
|
oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER)
|
|
|
|
|
oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER)
|
|
|
|
|
oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER)
|
|
|
|
|
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -412,7 +418,9 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
|
|
|
|
|
if rm_force not in [0, 1]:
|
|
|
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
|
|
|
user_data = bytearray([rm_recursive, rm_force])
|
|
|
|
|
user_data.extend(packet_source_dest_path("Copy"))
|
|
|
|
|
removed_file_or_dir = input("Specify absolute path to be removed: ")
|
|
|
|
|
user_data.extend(removed_file_or_dir.encode())
|
|
|
|
|
user_data.append(0)
|
|
|
|
|
q.add_log_cmd(Info.RM_HELPER)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data)
|
|
|
|
@@ -436,6 +444,14 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
|
|
|
|
|
CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
elif op_code == OpCode.MKDIR_HELPER:
|
|
|
|
|
q.add_log_cmd(Info.MKDIR_HELPER)
|
|
|
|
|
user_data = input("Specify absolute path of newly created directory: ")
|
|
|
|
|
user_data = bytearray(user_data.encode())
|
|
|
|
|
user_data.append(0)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
_LOGGER.warning(
|
|
|
|
|
f"Unknown operation code {op_code} for core controller commands"
|
|
|
|
@@ -460,8 +476,8 @@ def list_directory_base_user_data() -> bytearray:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def packet_source_dest_path(context: str) -> bytes:
|
|
|
|
|
source = input(f"{context} source file")
|
|
|
|
|
dest = input(f"{context} destination file")
|
|
|
|
|
source = input(f"Specify {context} source file: ")
|
|
|
|
|
dest = input(f"Specify {context} destination file: ")
|
|
|
|
|
raw_src_dest = bytearray(source.encode())
|
|
|
|
|
raw_src_dest.append(0)
|
|
|
|
|
raw_src_dest.extend(dest.encode())
|
|
|
|
@@ -577,3 +593,49 @@ def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
|
|
|
|
)
|
|
|
|
|
pw.dlog(printout)
|
|
|
|
|
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_core_ctrl_action_replies(
|
|
|
|
|
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytes
|
|
|
|
|
):
|
|
|
|
|
pw = PrintWrapper(printer)
|
|
|
|
|
if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
|
|
|
|
|
if len(custom_data) < 4:
|
|
|
|
|
_LOGGER.warning("Data unexpectedly small")
|
|
|
|
|
return
|
|
|
|
|
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
|
|
|
|
|
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
|
|
|
|
|
compressed = custom_data[8]
|
|
|
|
|
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
|
|
|
|
|
# Include length of NULL termination
|
|
|
|
|
file_data_offset = 9 + len(ls_cmd) + 1
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Received directory listing dump for ls command {ls_cmd}. "
|
|
|
|
|
f"Chunk {seq_idx + 1}/{total_chunks}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
|
|
|
|
|
if seq_idx_ == 0 and path_.exists():
|
|
|
|
|
os.remove(path_)
|
|
|
|
|
|
|
|
|
|
if compressed:
|
|
|
|
|
path = Path("dir_listing.txt.gz")
|
|
|
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
|
|
|
|
|
)
|
|
|
|
|
with open(path, "ab") as listing_file:
|
|
|
|
|
listing_file.write(custom_data[file_data_offset:])
|
|
|
|
|
else:
|
|
|
|
|
path = Path("dir_listing.txt")
|
|
|
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt"
|
|
|
|
|
)
|
|
|
|
|
with open(path, "a") as listing_file:
|
|
|
|
|
listing_file_str = custom_data[file_data_offset:].decode()
|
|
|
|
|
listing_file.write(listing_file_str)
|
|
|
|
|
if seq_idx + 1 == total_chunks:
|
|
|
|
|
pw.dlog("Full directory listing: ")
|
|
|
|
|
with open("dir_listing.txt", "r") as listing_file:
|
|
|
|
|
print(listing_file.read())
|
|
|
|
|