Merge pull request 'Core Controller Extension' (#207) from core-ctrl-extension into main
All checks were successful
EIVE/-/pipeline/head This commit looks good

Reviewed-on: #207
Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
This commit is contained in:
Marius Eggert 2023-06-22 16:25:31 +02:00
commit c7df13fa68
2 changed files with 119 additions and 60 deletions

View File

@ -44,6 +44,8 @@ _LOGGER = logging.getLogger(__name__)
FORWARD_SENSOR_TEMPS = False
# TODO: Transform this into a CLI argument
HK_OUTPUT_LEVEL = 1
def handle_hk_packet(
@ -70,13 +72,14 @@ def handle_hk_packet(
hk_data=hk_data,
)
try:
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
tm=tm_packet.pus_tm,
hk_data=hk_data,
)
if HK_OUTPUT_LEVEL > 0:
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
tm=tm_packet.pus_tm,
hk_data=hk_data,
)
except ValueError as e:
_LOGGER.exception(
f"{e} error when parsing HK data coming from {named_obj_id}"

View File

@ -32,6 +32,7 @@ class ActionId(enum.IntEnum):
RESET_REBOOT_COUNTER = 6
SWITCH_IMG_LOCK = 7
SET_MAX_REBOOT_CNT = 8
READ_REBOOT_MECHANISM_INFO = 9
UPDATE_OBSW_FROM_SD_0 = 10
UPDATE_OBSW_FROM_SD_1 = 11
UPDATE_OBSW_FROM_TMP = 12
@ -87,13 +88,14 @@ class OpCode:
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
ENABLE_REBOOT_FILE_HANDLING = ["rbh_off"]
DISABLE_REBOOT_FILE_HANDLING = ["rbh_on"]
RESET_ALL_REBOOT_COUNTERS = ["rbh_reset_a"]
RESET_REBOOT_COUNTER_00 = ["rbh_reset_00"]
RESET_REBOOT_COUNTER_01 = ["rbh_reset_01"]
RESET_REBOOT_COUNTER_10 = ["rbh_reset_10"]
RESET_REBOOT_COUNTER_11 = ["rbh_reset_11"]
READ_REBOOT_MECHANISM_INFO = "rbh_info"
ENABLE_REBOOT_FILE_HANDLING = "rbh_off"
DISABLE_REBOOT_FILE_HANDLING = "rbh_on"
RESET_ALL_REBOOT_COUNTERS = "rbh_reset_a"
RESET_REBOOT_COUNTER_00 = "rbh_reset_00"
RESET_REBOOT_COUNTER_01 = "rbh_reset_01"
RESET_REBOOT_COUNTER_10 = "rbh_reset_10"
RESET_REBOOT_COUNTER_11 = "rbh_reset_11"
SET_MAX_REBOOT_CNT = ["rbh_max_cnt"]
@ -110,6 +112,7 @@ class Info:
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information"
SWITCH_TO_SD_0 = "Switch to SD card 0"
SWITCH_TO_SD_1 = "Switch to SD card 1"
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
@ -154,6 +157,9 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
oce.add(
keys=OpCode.READ_REBOOT_MECHANISM_INFO, info=Info.READ_REBOOT_MECHANISM_INFO
)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
@ -294,7 +300,15 @@ def pack_core_commands( # noqa C901
chip=Chip.CHIP_1,
copy=Copy.COPY_1_GOLD,
)
elif op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING:
elif op_code == OpCode.READ_REBOOT_MECHANISM_INFO:
q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.READ_REBOOT_MECHANISM_INFO,
)
)
elif op_code == OpCode.DISABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Disabling reboot file handling")
user_data = bytearray([0])
q.add_pus_tc(
@ -304,7 +318,7 @@ def pack_core_commands( # noqa C901
user_data=user_data,
)
)
elif op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING:
elif op_code == OpCode.ENABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Enabling reboot file handling")
user_data = bytearray([1])
q.add_pus_tc(
@ -314,7 +328,7 @@ def pack_core_commands( # noqa C901
user_data=user_data,
)
)
elif op_code in OpCode.RESET_ALL_REBOOT_COUNTERS:
elif op_code == OpCode.RESET_ALL_REBOOT_COUNTERS:
q.add_log_cmd("Resetting all reboot counters")
q.add_pus_tc(
create_action_cmd(
@ -322,13 +336,13 @@ def pack_core_commands( # noqa C901
action_id=ActionId.RESET_REBOOT_COUNTER,
)
)
elif op_code in OpCode.RESET_REBOOT_COUNTER_00:
elif op_code == OpCode.RESET_REBOOT_COUNTER_00:
reset_specific_boot_counter(q, 0, 0)
elif op_code in OpCode.RESET_REBOOT_COUNTER_01:
elif op_code == OpCode.RESET_REBOOT_COUNTER_01:
reset_specific_boot_counter(q, 0, 1)
elif op_code in OpCode.RESET_REBOOT_COUNTER_10:
elif op_code == OpCode.RESET_REBOOT_COUNTER_10:
reset_specific_boot_counter(q, 1, 0)
elif op_code in OpCode.RESET_REBOOT_COUNTER_11:
elif op_code == OpCode.RESET_REBOOT_COUNTER_11:
reset_specific_boot_counter(q, 1, 1)
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
@ -385,7 +399,7 @@ def pack_core_commands( # noqa C901
domain_id=0,
unique_id=ParamId.PREF_SD,
parameter=pref_sd,
).pack()
)
)
)
elif op_code == OpCode.CP_HELPER:
@ -603,43 +617,85 @@ def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
def handle_core_ctrl_action_replies(
action_id: int, pw: PrintWrapper, custom_data: bytes
):
if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
if len(custom_data) < 4:
_LOGGER.warning("Data unexpectedly small")
return
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
compressed = custom_data[8]
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
# Include length of NULL termination
file_data_offset = 9 + len(ls_cmd) + 1
if action_id == ActionId.READ_REBOOT_MECHANISM_INFO:
handle_reboot_mechanism_info_reply(pw, custom_data)
elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
handle_list_dir_dump_reply(pw, custom_data)
def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes):
pw.dlog("Received reboot mechansm information")
fmt_str = "!BIIIIIBBBBBBBB"
inc_len = struct.calcsize(fmt_str)
if len(custom_data) < inc_len:
raise ValueError(f"Received custom data shorter than expected {inc_len}")
(
enabled,
max_count,
img00_count,
img01_count,
img10_count,
img11_count,
img00_lock,
img01_lock,
img10_lock,
img11_lock,
last_chip,
last_copy,
next_chip,
next_copy,
) = struct.unpack(fmt_str, custom_data[:inc_len])
pw.dlog(f"Enabled: {enabled}")
pw.dlog(f"Max Count: {max_count}")
pw.dlog(f"Count 00: {img00_count}")
pw.dlog(f"Count 01: {img01_count}")
pw.dlog(f"Count 10: {img10_count}")
pw.dlog(f"Count 11: {img11_count}")
pw.dlog(f"Lock 00: {img00_lock}")
pw.dlog(f"Lock 01: {img01_lock}")
pw.dlog(f"Lock 10: {img10_lock}")
pw.dlog(f"Lock 11: {img11_lock}")
pw.dlog(f"Last Chip: {last_chip}")
pw.dlog(f"Last Copy: {last_copy}")
pw.dlog(f"Next Chip: {next_chip}")
pw.dlog(f"Next Copy: {next_copy}")
def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes):
if len(custom_data) < 4:
_LOGGER.warning("Data unexpectedly small")
return
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
compressed = custom_data[8]
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
# Include length of NULL termination
file_data_offset = 9 + len(ls_cmd) + 1
pw.dlog(
f"Received directory listing dump for ls command {ls_cmd}. "
f"Chunk {seq_idx + 1}/{total_chunks}"
)
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
if seq_idx_ == 0 and path_.exists():
os.remove(path_)
if compressed:
path = Path("dir_listing.txt.gz")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(
f"Received directory listing dump for ls command {ls_cmd}. "
f"Chunk {seq_idx + 1}/{total_chunks}"
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
)
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
if seq_idx_ == 0 and path_.exists():
os.remove(path_)
if compressed:
path = Path("dir_listing.txt.gz")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
)
with open(path, "ab") as listing_file:
listing_file.write(custom_data[file_data_offset:])
else:
path = Path("dir_listing.txt")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(
f"Compression option: {compressed}. Dumping file into dir_listing.txt"
)
with open(path, "a") as listing_file:
listing_file_str = custom_data[file_data_offset:].decode()
listing_file.write(listing_file_str)
if seq_idx + 1 == total_chunks:
pw.dlog("Full directory listing: ")
with open("dir_listing.txt", "r") as listing_file:
print(listing_file.read())
with open(path, "ab") as listing_file:
listing_file.write(custom_data[file_data_offset:])
else:
path = Path("dir_listing.txt")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt")
with open(path, "a") as listing_file:
listing_file_str = custom_data[file_data_offset:].decode()
listing_file.write(listing_file_str)
if seq_idx + 1 == total_chunks:
pw.dlog("Full directory listing: ")
with open("dir_listing.txt", "r") as listing_file:
print(listing_file.read())