Merge branch 'main' into cp-helper-force-flag
Some checks are pending
EIVE/-/pipeline/pr-main Build queued...
Some checks are pending
EIVE/-/pipeline/pr-main Build queued...
This commit is contained in:
commit
fe3accb9ad
@ -44,6 +44,8 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
FORWARD_SENSOR_TEMPS = False
|
FORWARD_SENSOR_TEMPS = False
|
||||||
|
# TODO: Transform this into a CLI argument
|
||||||
|
HK_OUTPUT_LEVEL = 1
|
||||||
|
|
||||||
|
|
||||||
def handle_hk_packet(
|
def handle_hk_packet(
|
||||||
@ -70,13 +72,14 @@ def handle_hk_packet(
|
|||||||
hk_data=hk_data,
|
hk_data=hk_data,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
handle_regular_hk_print(
|
if HK_OUTPUT_LEVEL > 0:
|
||||||
printer=printer,
|
handle_regular_hk_print(
|
||||||
object_id=named_obj_id,
|
printer=printer,
|
||||||
hk_packet=tm_packet,
|
object_id=named_obj_id,
|
||||||
tm=tm_packet.pus_tm,
|
hk_packet=tm_packet,
|
||||||
hk_data=hk_data,
|
tm=tm_packet.pus_tm,
|
||||||
)
|
hk_data=hk_data,
|
||||||
|
)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
_LOGGER.exception(
|
_LOGGER.exception(
|
||||||
f"{e} error when parsing HK data coming from {named_obj_id}"
|
f"{e} error when parsing HK data coming from {named_obj_id}"
|
||||||
|
@ -32,6 +32,7 @@ class ActionId(enum.IntEnum):
|
|||||||
RESET_REBOOT_COUNTER = 6
|
RESET_REBOOT_COUNTER = 6
|
||||||
SWITCH_IMG_LOCK = 7
|
SWITCH_IMG_LOCK = 7
|
||||||
SET_MAX_REBOOT_CNT = 8
|
SET_MAX_REBOOT_CNT = 8
|
||||||
|
READ_REBOOT_MECHANISM_INFO = 9
|
||||||
UPDATE_OBSW_FROM_SD_0 = 10
|
UPDATE_OBSW_FROM_SD_0 = 10
|
||||||
UPDATE_OBSW_FROM_SD_1 = 11
|
UPDATE_OBSW_FROM_SD_1 = 11
|
||||||
UPDATE_OBSW_FROM_TMP = 12
|
UPDATE_OBSW_FROM_TMP = 12
|
||||||
@ -87,13 +88,14 @@ class OpCode:
|
|||||||
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
|
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
|
||||||
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
|
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
|
||||||
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
|
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
|
||||||
ENABLE_REBOOT_FILE_HANDLING = ["rbh_off"]
|
READ_REBOOT_MECHANISM_INFO = "rbh_info"
|
||||||
DISABLE_REBOOT_FILE_HANDLING = ["rbh_on"]
|
ENABLE_REBOOT_FILE_HANDLING = "rbh_off"
|
||||||
RESET_ALL_REBOOT_COUNTERS = ["rbh_reset_a"]
|
DISABLE_REBOOT_FILE_HANDLING = "rbh_on"
|
||||||
RESET_REBOOT_COUNTER_00 = ["rbh_reset_00"]
|
RESET_ALL_REBOOT_COUNTERS = "rbh_reset_a"
|
||||||
RESET_REBOOT_COUNTER_01 = ["rbh_reset_01"]
|
RESET_REBOOT_COUNTER_00 = "rbh_reset_00"
|
||||||
RESET_REBOOT_COUNTER_10 = ["rbh_reset_10"]
|
RESET_REBOOT_COUNTER_01 = "rbh_reset_01"
|
||||||
RESET_REBOOT_COUNTER_11 = ["rbh_reset_11"]
|
RESET_REBOOT_COUNTER_10 = "rbh_reset_10"
|
||||||
|
RESET_REBOOT_COUNTER_11 = "rbh_reset_11"
|
||||||
SET_MAX_REBOOT_CNT = ["rbh_max_cnt"]
|
SET_MAX_REBOOT_CNT = ["rbh_max_cnt"]
|
||||||
|
|
||||||
|
|
||||||
@ -110,6 +112,7 @@ class Info:
|
|||||||
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
|
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
|
||||||
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
|
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
|
||||||
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
|
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
|
||||||
|
READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information"
|
||||||
SWITCH_TO_SD_0 = "Switch to SD card 0"
|
SWITCH_TO_SD_0 = "Switch to SD card 0"
|
||||||
SWITCH_TO_SD_1 = "Switch to SD card 1"
|
SWITCH_TO_SD_1 = "Switch to SD card 1"
|
||||||
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
|
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
|
||||||
@ -154,6 +157,9 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
|||||||
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
|
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
|
||||||
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
|
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
|
||||||
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
|
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
|
||||||
|
oce.add(
|
||||||
|
keys=OpCode.READ_REBOOT_MECHANISM_INFO, info=Info.READ_REBOOT_MECHANISM_INFO
|
||||||
|
)
|
||||||
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
|
||||||
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
|
||||||
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
|
||||||
@ -294,7 +300,15 @@ def pack_core_commands( # noqa C901
|
|||||||
chip=Chip.CHIP_1,
|
chip=Chip.CHIP_1,
|
||||||
copy=Copy.COPY_1_GOLD,
|
copy=Copy.COPY_1_GOLD,
|
||||||
)
|
)
|
||||||
elif op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING:
|
elif op_code == OpCode.READ_REBOOT_MECHANISM_INFO:
|
||||||
|
q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO)
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_action_cmd(
|
||||||
|
object_id=CORE_CONTROLLER_ID,
|
||||||
|
action_id=ActionId.READ_REBOOT_MECHANISM_INFO,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif op_code == OpCode.DISABLE_REBOOT_FILE_HANDLING:
|
||||||
q.add_log_cmd("Disabling reboot file handling")
|
q.add_log_cmd("Disabling reboot file handling")
|
||||||
user_data = bytearray([0])
|
user_data = bytearray([0])
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
@ -304,7 +318,7 @@ def pack_core_commands( # noqa C901
|
|||||||
user_data=user_data,
|
user_data=user_data,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
elif op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING:
|
elif op_code == OpCode.ENABLE_REBOOT_FILE_HANDLING:
|
||||||
q.add_log_cmd("Enabling reboot file handling")
|
q.add_log_cmd("Enabling reboot file handling")
|
||||||
user_data = bytearray([1])
|
user_data = bytearray([1])
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
@ -314,7 +328,7 @@ def pack_core_commands( # noqa C901
|
|||||||
user_data=user_data,
|
user_data=user_data,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
elif op_code in OpCode.RESET_ALL_REBOOT_COUNTERS:
|
elif op_code == OpCode.RESET_ALL_REBOOT_COUNTERS:
|
||||||
q.add_log_cmd("Resetting all reboot counters")
|
q.add_log_cmd("Resetting all reboot counters")
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_action_cmd(
|
create_action_cmd(
|
||||||
@ -322,13 +336,13 @@ def pack_core_commands( # noqa C901
|
|||||||
action_id=ActionId.RESET_REBOOT_COUNTER,
|
action_id=ActionId.RESET_REBOOT_COUNTER,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
elif op_code in OpCode.RESET_REBOOT_COUNTER_00:
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_00:
|
||||||
reset_specific_boot_counter(q, 0, 0)
|
reset_specific_boot_counter(q, 0, 0)
|
||||||
elif op_code in OpCode.RESET_REBOOT_COUNTER_01:
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_01:
|
||||||
reset_specific_boot_counter(q, 0, 1)
|
reset_specific_boot_counter(q, 0, 1)
|
||||||
elif op_code in OpCode.RESET_REBOOT_COUNTER_10:
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_10:
|
||||||
reset_specific_boot_counter(q, 1, 0)
|
reset_specific_boot_counter(q, 1, 0)
|
||||||
elif op_code in OpCode.RESET_REBOOT_COUNTER_11:
|
elif op_code == OpCode.RESET_REBOOT_COUNTER_11:
|
||||||
reset_specific_boot_counter(q, 1, 1)
|
reset_specific_boot_counter(q, 1, 1)
|
||||||
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
|
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
|
||||||
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
|
||||||
@ -606,43 +620,85 @@ def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|||||||
def handle_core_ctrl_action_replies(
|
def handle_core_ctrl_action_replies(
|
||||||
action_id: int, pw: PrintWrapper, custom_data: bytes
|
action_id: int, pw: PrintWrapper, custom_data: bytes
|
||||||
):
|
):
|
||||||
if action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
|
if action_id == ActionId.READ_REBOOT_MECHANISM_INFO:
|
||||||
if len(custom_data) < 4:
|
handle_reboot_mechanism_info_reply(pw, custom_data)
|
||||||
_LOGGER.warning("Data unexpectedly small")
|
elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
|
||||||
return
|
handle_list_dir_dump_reply(pw, custom_data)
|
||||||
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
|
|
||||||
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
|
|
||||||
compressed = custom_data[8]
|
def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes):
|
||||||
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
|
pw.dlog("Received reboot mechansm information")
|
||||||
# Include length of NULL termination
|
fmt_str = "!BIIIIIBBBBBBBB"
|
||||||
file_data_offset = 9 + len(ls_cmd) + 1
|
inc_len = struct.calcsize(fmt_str)
|
||||||
|
if len(custom_data) < inc_len:
|
||||||
|
raise ValueError(f"Received custom data shorter than expected {inc_len}")
|
||||||
|
(
|
||||||
|
enabled,
|
||||||
|
max_count,
|
||||||
|
img00_count,
|
||||||
|
img01_count,
|
||||||
|
img10_count,
|
||||||
|
img11_count,
|
||||||
|
img00_lock,
|
||||||
|
img01_lock,
|
||||||
|
img10_lock,
|
||||||
|
img11_lock,
|
||||||
|
last_chip,
|
||||||
|
last_copy,
|
||||||
|
next_chip,
|
||||||
|
next_copy,
|
||||||
|
) = struct.unpack(fmt_str, custom_data[:inc_len])
|
||||||
|
pw.dlog(f"Enabled: {enabled}")
|
||||||
|
pw.dlog(f"Max Count: {max_count}")
|
||||||
|
pw.dlog(f"Count 00: {img00_count}")
|
||||||
|
pw.dlog(f"Count 01: {img01_count}")
|
||||||
|
pw.dlog(f"Count 10: {img10_count}")
|
||||||
|
pw.dlog(f"Count 11: {img11_count}")
|
||||||
|
pw.dlog(f"Lock 00: {img00_lock}")
|
||||||
|
pw.dlog(f"Lock 01: {img01_lock}")
|
||||||
|
pw.dlog(f"Lock 10: {img10_lock}")
|
||||||
|
pw.dlog(f"Lock 11: {img11_lock}")
|
||||||
|
pw.dlog(f"Last Chip: {last_chip}")
|
||||||
|
pw.dlog(f"Last Copy: {last_copy}")
|
||||||
|
pw.dlog(f"Next Chip: {next_chip}")
|
||||||
|
pw.dlog(f"Next Copy: {next_copy}")
|
||||||
|
|
||||||
|
|
||||||
|
def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes):
|
||||||
|
if len(custom_data) < 4:
|
||||||
|
_LOGGER.warning("Data unexpectedly small")
|
||||||
|
return
|
||||||
|
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
|
||||||
|
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
|
||||||
|
compressed = custom_data[8]
|
||||||
|
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
|
||||||
|
# Include length of NULL termination
|
||||||
|
file_data_offset = 9 + len(ls_cmd) + 1
|
||||||
|
pw.dlog(
|
||||||
|
f"Received directory listing dump for ls command {ls_cmd}. "
|
||||||
|
f"Chunk {seq_idx + 1}/{total_chunks}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
|
||||||
|
if seq_idx_ == 0 and path_.exists():
|
||||||
|
os.remove(path_)
|
||||||
|
|
||||||
|
if compressed:
|
||||||
|
path = Path("dir_listing.txt.gz")
|
||||||
|
remove_if_exists_and_new(seq_idx, path)
|
||||||
pw.dlog(
|
pw.dlog(
|
||||||
f"Received directory listing dump for ls command {ls_cmd}. "
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
|
||||||
f"Chunk {seq_idx + 1}/{total_chunks}"
|
|
||||||
)
|
)
|
||||||
|
with open(path, "ab") as listing_file:
|
||||||
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
|
listing_file.write(custom_data[file_data_offset:])
|
||||||
if seq_idx_ == 0 and path_.exists():
|
else:
|
||||||
os.remove(path_)
|
path = Path("dir_listing.txt")
|
||||||
|
remove_if_exists_and_new(seq_idx, path)
|
||||||
if compressed:
|
pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt")
|
||||||
path = Path("dir_listing.txt.gz")
|
with open(path, "a") as listing_file:
|
||||||
remove_if_exists_and_new(seq_idx, path)
|
listing_file_str = custom_data[file_data_offset:].decode()
|
||||||
pw.dlog(
|
listing_file.write(listing_file_str)
|
||||||
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
|
if seq_idx + 1 == total_chunks:
|
||||||
)
|
pw.dlog("Full directory listing: ")
|
||||||
with open(path, "ab") as listing_file:
|
with open("dir_listing.txt", "r") as listing_file:
|
||||||
listing_file.write(custom_data[file_data_offset:])
|
print(listing_file.read())
|
||||||
else:
|
|
||||||
path = Path("dir_listing.txt")
|
|
||||||
remove_if_exists_and_new(seq_idx, path)
|
|
||||||
pw.dlog(
|
|
||||||
f"Compression option: {compressed}. Dumping file into dir_listing.txt"
|
|
||||||
)
|
|
||||||
with open(path, "a") as listing_file:
|
|
||||||
listing_file_str = custom_data[file_data_offset:].decode()
|
|
||||||
listing_file.write(listing_file_str)
|
|
||||||
if seq_idx + 1 == total_chunks:
|
|
||||||
pw.dlog("Full directory listing: ")
|
|
||||||
with open("dir_listing.txt", "r") as listing_file:
|
|
||||||
print(listing_file.read())
|
|
||||||
|
Loading…
Reference in New Issue
Block a user