diff --git a/eive_tmtc/pus_tm/action_reply_handler.py b/eive_tmtc/pus_tm/action_reply_handler.py index 7a359c4..ac09253 100644 --- a/eive_tmtc/pus_tm/action_reply_handler.py +++ b/eive_tmtc/pus_tm/action_reply_handler.py @@ -37,6 +37,8 @@ def handle_action_reply( return handle_ploc_replies(action_id, printer, custom_data) elif object_id.as_bytes == PLOC_SUPV_ID: return handle_supervisor_replies(action_id, printer, custom_data) + elif object_id.as_bytes == CORE_CONTROLLER_ID: + return handle_core_ctrl_action_replies(action_id, printer, custom_data) elif object_id.as_bytes == STAR_TRACKER_ID: return handle_startracker_replies(action_id, printer, custom_data) elif object_id.as_bytes in [ diff --git a/eive_tmtc/tmtc/core.py b/eive_tmtc/tmtc/core.py index 60fb766..706b582 100644 --- a/eive_tmtc/tmtc/core.py +++ b/eive_tmtc/tmtc/core.py @@ -591,3 +591,35 @@ def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): ) pw.dlog(printout) printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3) + + +def handle_core_ctrl_action_replies( + action_id: int, printer: FsfwTmTcPrinter, custom_data: bytes +): + pw = PrintWrapper(printer) + if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY: + if len(custom_data) < 4: + _LOGGER.warning("Data unexpectedly small") + return + seq_idx = struct.unpack("!I", custom_data[0:4])[0] + total_chunks = struct.unpack("!I", custom_data[4:8])[0] + compressed = custom_data[4] + ls_cmd = custom_data[5:].decode() + # Include length of NULL termination + file_data_offset = 5 + len(ls_cmd) + 1 + pw.dlog( + f"Received directory listing dump for ls command {ls_cmd}. Chunk {seq_idx}/{total_chunks}" + ) + if compressed: + pw.dlog( + f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz" + ) + with open("dir_listing.txt.gz", "ab") as listing_file: + listing_file.write(custom_data[file_data_offset:]) + else: + pw.dlog( + f"Compression option: {compressed}. Dumping file into dir_listing.txt" + ) + with open("dir_listing.txt", "a") as listing_file: + listing_file_str = custom_data[file_data_offset:].decode() + listing_file.write(listing_file_str)