|
|
|
@@ -32,6 +32,7 @@ class ActionId(enum.IntEnum):
|
|
|
|
|
RESET_REBOOT_COUNTER = 6
|
|
|
|
|
SWITCH_IMG_LOCK = 7
|
|
|
|
|
SET_MAX_REBOOT_CNT = 8
|
|
|
|
|
READ_REBOOT_MECHANISM_INFO = 9
|
|
|
|
|
UPDATE_OBSW_FROM_SD_0 = 10
|
|
|
|
|
UPDATE_OBSW_FROM_SD_1 = 11
|
|
|
|
|
UPDATE_OBSW_FROM_TMP = 12
|
|
|
|
@@ -87,13 +88,14 @@ class OpCode:
|
|
|
|
|
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
|
|
|
|
|
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
|
|
|
|
|
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
|
|
|
|
|
ENABLE_REBOOT_FILE_HANDLING = ["rbh_off"]
|
|
|
|
|
DISABLE_REBOOT_FILE_HANDLING = ["rbh_on"]
|
|
|
|
|
RESET_ALL_REBOOT_COUNTERS = ["rbh_reset_a"]
|
|
|
|
|
RESET_REBOOT_COUNTER_00 = ["rbh_reset_00"]
|
|
|
|
|
RESET_REBOOT_COUNTER_01 = ["rbh_reset_01"]
|
|
|
|
|
RESET_REBOOT_COUNTER_10 = ["rbh_reset_10"]
|
|
|
|
|
RESET_REBOOT_COUNTER_11 = ["rbh_reset_11"]
|
|
|
|
|
READ_REBOOT_MECHANISM_INFO = "rbh_info"
|
|
|
|
|
ENABLE_REBOOT_FILE_HANDLING = "rbh_off"
|
|
|
|
|
DISABLE_REBOOT_FILE_HANDLING = "rbh_on"
|
|
|
|
|
RESET_ALL_REBOOT_COUNTERS = "rbh_reset_a"
|
|
|
|
|
RESET_REBOOT_COUNTER_00 = "rbh_reset_00"
|
|
|
|
|
RESET_REBOOT_COUNTER_01 = "rbh_reset_01"
|
|
|
|
|
RESET_REBOOT_COUNTER_10 = "rbh_reset_10"
|
|
|
|
|
RESET_REBOOT_COUNTER_11 = "rbh_reset_11"
|
|
|
|
|
SET_MAX_REBOOT_CNT = ["rbh_max_cnt"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -110,6 +112,7 @@ class Info:
|
|
|
|
|
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
|
|
|
|
|
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
|
|
|
|
|
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
|
|
|
|
|
READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information"
|
|
|
|
|
SWITCH_TO_SD_0 = "Switch to SD card 0"
|
|
|
|
|
SWITCH_TO_SD_1 = "Switch to SD card 1"
|
|
|
|
|
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
|
|
|
|
@@ -154,6 +157,9 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
|
|
|
|
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
|
|
|
|
|
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
|
|
|
|
|
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
|
|
|
|
|
oce.add(
|
|
|
|
|
keys=OpCode.READ_REBOOT_MECHANISM_INFO, info=Info.READ_REBOOT_MECHANISM_INFO
|
|
|
|
|
)
|
|
|
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
|
|
|
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
|
|
|
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
|
|
|
|
@@ -294,7 +300,15 @@ def pack_core_commands( # noqa C901
|
|
|
|
|
chip=Chip.CHIP_1,
|
|
|
|
|
copy=Copy.COPY_1_GOLD,
|
|
|
|
|
)
|
|
|
|
|
elif op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING:
|
|
|
|
|
elif op_code == OpCode.READ_REBOOT_MECHANISM_INFO:
|
|
|
|
|
q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_action_cmd(
|
|
|
|
|
object_id=CORE_CONTROLLER_ID,
|
|
|
|
|
action_id=ActionId.READ_REBOOT_MECHANISM_INFO,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
elif op_code == OpCode.DISABLE_REBOOT_FILE_HANDLING:
|
|
|
|
|
q.add_log_cmd("Disabling reboot file handling")
|
|
|
|
|
user_data = bytearray([0])
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
@@ -304,7 +318,7 @@ def pack_core_commands( # noqa C901
|
|
|
|
|
user_data=user_data,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
elif op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING:
|
|
|
|
|
elif op_code == OpCode.ENABLE_REBOOT_FILE_HANDLING:
|
|
|
|
|
q.add_log_cmd("Enabling reboot file handling")
|
|
|
|
|
user_data = bytearray([1])
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
@@ -314,7 +328,7 @@ def pack_core_commands( # noqa C901
|
|
|
|
|
user_data=user_data,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
elif op_code in OpCode.RESET_ALL_REBOOT_COUNTERS:
|
|
|
|
|
elif op_code == OpCode.RESET_ALL_REBOOT_COUNTERS:
|
|
|
|
|
q.add_log_cmd("Resetting all reboot counters")
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
|
create_action_cmd(
|
|
|
|
@@ -322,13 +336,13 @@ def pack_core_commands( # noqa C901
|
|
|
|
|
action_id=ActionId.RESET_REBOOT_COUNTER,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
elif op_code in OpCode.RESET_REBOOT_COUNTER_00:
|
|
|
|
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_00:
|
|
|
|
|
reset_specific_boot_counter(q, 0, 0)
|
|
|
|
|
elif op_code in OpCode.RESET_REBOOT_COUNTER_01:
|
|
|
|
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_01:
|
|
|
|
|
reset_specific_boot_counter(q, 0, 1)
|
|
|
|
|
elif op_code in OpCode.RESET_REBOOT_COUNTER_10:
|
|
|
|
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_10:
|
|
|
|
|
reset_specific_boot_counter(q, 1, 0)
|
|
|
|
|
elif op_code in OpCode.RESET_REBOOT_COUNTER_11:
|
|
|
|
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_11:
|
|
|
|
|
reset_specific_boot_counter(q, 1, 1)
|
|
|
|
|
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
|
|
|
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
|
|
|
|
@@ -385,14 +399,17 @@ def pack_core_commands( # noqa C901
|
|
|
|
|
domain_id=0,
|
|
|
|
|
unique_id=ParamId.PREF_SD,
|
|
|
|
|
parameter=pref_sd,
|
|
|
|
|
).pack()
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
elif op_code == OpCode.CP_HELPER:
|
|
|
|
|
cp_recursive = int(input("Copy recursively (0/1) ?: "))
|
|
|
|
|
if cp_recursive not in [0, 1]:
|
|
|
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
|
|
|
user_data = bytearray([cp_recursive])
|
|
|
|
|
cp_force = int(input("Copy with force option(0/1) ?: "))
|
|
|
|
|
if cp_force not in [0, 1]:
|
|
|
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
|
|
|
user_data = bytearray([cp_recursive, cp_force])
|
|
|
|
|
user_data.extend(packet_source_dest_path("Copy"))
|
|
|
|
|
q.add_log_cmd(Info.CP_HELPER)
|
|
|
|
|
q.add_pus_tc(
|
|
|
|
@@ -603,43 +620,85 @@ def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|
|
|
|
def handle_core_ctrl_action_replies(
|
|
|
|
|
action_id: int, pw: PrintWrapper, custom_data: bytes
|
|
|
|
|
):
|
|
|
|
|
if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
|
|
|
|
|
if len(custom_data) < 4:
|
|
|
|
|
_LOGGER.warning("Data unexpectedly small")
|
|
|
|
|
return
|
|
|
|
|
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
|
|
|
|
|
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
|
|
|
|
|
compressed = custom_data[8]
|
|
|
|
|
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
|
|
|
|
|
# Include length of NULL termination
|
|
|
|
|
file_data_offset = 9 + len(ls_cmd) + 1
|
|
|
|
|
if action_id == ActionId.READ_REBOOT_MECHANISM_INFO:
|
|
|
|
|
handle_reboot_mechanism_info_reply(pw, custom_data)
|
|
|
|
|
elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
|
|
|
|
|
handle_list_dir_dump_reply(pw, custom_data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes):
|
|
|
|
|
pw.dlog("Received reboot mechansm information")
|
|
|
|
|
fmt_str = "!BIIIIIBBBBBBBB"
|
|
|
|
|
inc_len = struct.calcsize(fmt_str)
|
|
|
|
|
if len(custom_data) < inc_len:
|
|
|
|
|
raise ValueError(f"Received custom data shorter than expected {inc_len}")
|
|
|
|
|
(
|
|
|
|
|
enabled,
|
|
|
|
|
max_count,
|
|
|
|
|
img00_count,
|
|
|
|
|
img01_count,
|
|
|
|
|
img10_count,
|
|
|
|
|
img11_count,
|
|
|
|
|
img00_lock,
|
|
|
|
|
img01_lock,
|
|
|
|
|
img10_lock,
|
|
|
|
|
img11_lock,
|
|
|
|
|
last_chip,
|
|
|
|
|
last_copy,
|
|
|
|
|
next_chip,
|
|
|
|
|
next_copy,
|
|
|
|
|
) = struct.unpack(fmt_str, custom_data[:inc_len])
|
|
|
|
|
pw.dlog(f"Enabled: {enabled}")
|
|
|
|
|
pw.dlog(f"Max Count: {max_count}")
|
|
|
|
|
pw.dlog(f"Count 00: {img00_count}")
|
|
|
|
|
pw.dlog(f"Count 01: {img01_count}")
|
|
|
|
|
pw.dlog(f"Count 10: {img10_count}")
|
|
|
|
|
pw.dlog(f"Count 11: {img11_count}")
|
|
|
|
|
pw.dlog(f"Lock 00: {img00_lock}")
|
|
|
|
|
pw.dlog(f"Lock 01: {img01_lock}")
|
|
|
|
|
pw.dlog(f"Lock 10: {img10_lock}")
|
|
|
|
|
pw.dlog(f"Lock 11: {img11_lock}")
|
|
|
|
|
pw.dlog(f"Last Chip: {last_chip}")
|
|
|
|
|
pw.dlog(f"Last Copy: {last_copy}")
|
|
|
|
|
pw.dlog(f"Next Chip: {next_chip}")
|
|
|
|
|
pw.dlog(f"Next Copy: {next_copy}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes):
|
|
|
|
|
if len(custom_data) < 4:
|
|
|
|
|
_LOGGER.warning("Data unexpectedly small")
|
|
|
|
|
return
|
|
|
|
|
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
|
|
|
|
|
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
|
|
|
|
|
compressed = custom_data[8]
|
|
|
|
|
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
|
|
|
|
|
# Include length of NULL termination
|
|
|
|
|
file_data_offset = 9 + len(ls_cmd) + 1
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Received directory listing dump for ls command {ls_cmd}. "
|
|
|
|
|
f"Chunk {seq_idx + 1}/{total_chunks}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
|
|
|
|
|
if seq_idx_ == 0 and path_.exists():
|
|
|
|
|
os.remove(path_)
|
|
|
|
|
|
|
|
|
|
if compressed:
|
|
|
|
|
path = Path("dir_listing.txt.gz")
|
|
|
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Received directory listing dump for ls command {ls_cmd}. "
|
|
|
|
|
f"Chunk {seq_idx + 1}/{total_chunks}"
|
|
|
|
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
|
|
|
|
|
if seq_idx_ == 0 and path_.exists():
|
|
|
|
|
os.remove(path_)
|
|
|
|
|
|
|
|
|
|
if compressed:
|
|
|
|
|
path = Path("dir_listing.txt.gz")
|
|
|
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
|
|
|
|
|
)
|
|
|
|
|
with open(path, "ab") as listing_file:
|
|
|
|
|
listing_file.write(custom_data[file_data_offset:])
|
|
|
|
|
else:
|
|
|
|
|
path = Path("dir_listing.txt")
|
|
|
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
|
|
|
pw.dlog(
|
|
|
|
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt"
|
|
|
|
|
)
|
|
|
|
|
with open(path, "a") as listing_file:
|
|
|
|
|
listing_file_str = custom_data[file_data_offset:].decode()
|
|
|
|
|
listing_file.write(listing_file_str)
|
|
|
|
|
if seq_idx + 1 == total_chunks:
|
|
|
|
|
pw.dlog("Full directory listing: ")
|
|
|
|
|
with open("dir_listing.txt", "r") as listing_file:
|
|
|
|
|
print(listing_file.read())
|
|
|
|
|
with open(path, "ab") as listing_file:
|
|
|
|
|
listing_file.write(custom_data[file_data_offset:])
|
|
|
|
|
else:
|
|
|
|
|
path = Path("dir_listing.txt")
|
|
|
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
|
|
|
pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt")
|
|
|
|
|
with open(path, "a") as listing_file:
|
|
|
|
|
listing_file_str = custom_data[file_data_offset:].decode()
|
|
|
|
|
listing_file.write(listing_file_str)
|
|
|
|
|
if seq_idx + 1 == total_chunks:
|
|
|
|
|
pw.dlog("Full directory listing: ")
|
|
|
|
|
with open("dir_listing.txt", "r") as listing_file:
|
|
|
|
|
print(listing_file.read())
|
|
|
|
|