eive-tmtc/eive_tmtc/tmtc/core.py
Robin Mueller dccbf89da1
All checks were successful
EIVE/-/pipeline/head This commit looks good
some more fixes
2024-05-08 10:49:20 +02:00

724 lines
26 KiB
Python

import enum
import logging
import os
import struct
from pathlib import Path
from typing import Tuple
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import CmdTreeNode
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.tc.s3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.pus.s20_fsfw_param import (
create_scalar_u8_parameter,
create_load_param_cmd,
)
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.pus.s11_tc_sched import (
create_enable_tc_sched_cmd,
create_disable_tc_sched_cmd,
)
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from eive_tmtc.pus_tm.defs import PrintWrapper
_LOGGER = logging.getLogger(__name__)
class SdState(enum.IntEnum):
OFF = 0
ON = 1
MOUNTED = 2
class SdCardSelect(enum.IntEnum):
SD_0 = 0
SD_1 = 1
BOTH = 2
NONE = 3
class ActionId(enum.IntEnum):
ANNOUNCE_VERSION = 1
ANNOUNCE_CURRENT_IMAGE = 2
ANNOUNCE_BOOT_COUNTS = 3
SWITCH_REBOOT_FILE_HANDLING = 5
RESET_REBOOT_COUNTER = 6
SWITCH_IMG_LOCK = 7
SET_MAX_REBOOT_CNT = 8
READ_REBOOT_MECHANISM_INFO = 9
UPDATE_OBSW_FROM_SD_0 = 10
UPDATE_OBSW_FROM_SD_1 = 11
UPDATE_OBSW_FROM_TMP = 12
SWITCH_TO_SD_0 = 16
SWITCH_TO_SD_1 = 17
SWITCH_TO_BOTH_SD_CARDS = 18
AUTO_SWITCH_ENABLE = 19
AUTO_SWITCH_DISABLE = 20
XSC_REBOOT = 32
FULL_REBOOT = 34
EXECUTE_SHELL_CMD_BLOCKING = 40
EXECUTE_SHELL_CMD_NON_BLOCKING = 41
SYSTEMCTL_CMD_EXECUTOR = 42
LIST_DIR_INTO_FILE = 50
LIST_DIR_DUMP_DIRECTLY = 51
CP_HELPER = 52
MV_HELPER = 53
RM_HELPER = 54
MKDIR_HELPER = 55
ENABLE_SCHEDULER = 56
UPDATE_LEAP_SECONRS = 60
class ParamId(enum.IntEnum):
PREF_SD = 0
class SetId(enum.IntEnum):
HK = 5
class OpCode:
ANNOUNCE_VERSION = "announce_version"
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
ANNOUNCE_BOOT_COUNTS = "announce_boot_counts"
EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking"
SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd"
LIST_DIR_INTO_FILE = "list_dir_into_file"
LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly"
CP_HELPER = "cp_helper"
MV_HELPER = "mv_helper"
RM_HELPER = "rm_helper"
MKDIR_HELPER = "mkdir_helper"
SET_PREF_SD = "set_pref_sd"
REBOOT_XSC = "reboot_xsc"
XSC_REBOOT_SELF = "reboot_self"
XSC_REBOOT_0_0 = "reboot_00"
XSC_REBOOT_0_1 = "reboot_01"
XSC_REBOOT_1_0 = "reboot_10"
XSC_REBOOT_1_1 = "reboot_11"
REBOOT_FULL = "reboot_regular"
GET_HK = "get_hk"
OBSW_UPDATE_FROM_SD_0 = "obsw_update_sd0"
OBSW_UPDATE_FROM_SD_1 = "obsw_update_sd1"
OBSW_UPDATE_FROM_TMP = "obsw_update_tmp"
SWITCH_TO_SD_0 = "switch_to_sd_0"
SWITCH_TO_SD_1 = "switch_to_sd_1"
SWITCH_TO_BOTH_SD_CARDS = "switch_to_both_sd_cards"
READ_REBOOT_MECHANISM_INFO = "rbh_info"
ENABLE_REBOOT_FILE_HANDLING = "rwd_on"
DISABLE_REBOOT_FILE_HANDLING = "rwd_off"
RESET_ALL_REBOOT_COUNTERS = "rwd_reset_a"
RWD_RESET_REBOOT_COUNTER_00 = "rwd_reset_00"
RWD_RESET_REBOOT_COUNTER_01 = "rwd_reset_01"
RWD_RESET_REBOOT_COUNTER_10 = "rwd_reset_10"
RWD_RESET_REBOOT_COUNTER_11 = "rwd_reset_11"
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
AUTO_SWITCH_ENABLE = "auto_switch_enable"
AUTO_SWITCH_DISABLE = "auto_switch_disable"
ENABLE_SCHEDULER = "enable_scheduler"
DISABLE_SCHEDULER = "disable_scheduler"
UPDATE_LEAP_SECONDS = "leap_seconds_update"
class Info:
ANNOUNCE_VERSION = "Announce version"
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
ANNOUNCE_BOOT_COUNTS = "Announce boot counts"
SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command"
EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking"
SET_PREF_SD = "Set preferred SD card"
REBOOT_XSC = "XSC reboot with prompt"
REBOOT_FULL = "Full regular reboot"
XSC_REBOOT_SELF = "Reboot Self"
XSC_REBOOT_0_0 = "Reboot to 0 0"
XSC_REBOOT_0_1 = "Reboot to 0 1"
XSC_REBOOT_1_0 = "Reboot to 1 0"
XSC_REBOOT_1_1 = "Reboot to 1 1"
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information"
SWITCH_TO_SD_0 = "Switch to SD card 0"
SWITCH_TO_SD_1 = "Switch to SD card 1"
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
LIST_DIR_INTO_FILE = "List directory, dump output into file"
LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly"
CP_HELPER = "Filesystem Copy Helper"
MV_HELPER = "Filesystem Move Helper"
RM_HELPER = "Filesystem Removal Helper"
MKDIR_HELPER = "Filesystem Directory Creation Helper"
ENABLE_REBOOT_FILE_HANDLING = "Enable reboot file handling"
DISABLE_REBOOT_FILE_HANDLING = "Disable reboot file handling"
RESET_ALL_REBOOT_COUNTERS = "Reset all reboot counters"
RWD_RESET_REBOOT_COUNTER_00 = "Reset reboot counter 0 0"
RWD_RESET_REBOOT_COUNTER_01 = "Reset reboot counter 0 0"
RWD_RESET_REBOOT_COUNTER_10 = "Reset reboot counter 1 0"
GET_HK = "Get HK set"
RWD_RESET_REBOOT_COUNTER_11 = "Reset reboot counter 1 1"
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
ENABLE_SCHEDULER = "Enable scheduler"
DISABLE_SCHEDULER = "Disable scheduler"
UPDATE_LEAP_SECONDS = "Updates the Leap Seconds"
class Chip(enum.IntEnum):
CHIP_0 = 0
CHIP_1 = 1
NONE = 2
class Copy(enum.IntEnum):
COPY_0_NOM = 0
COPY_1_GOLD = 1
NONE = 2
class SystemctlCmd(enum.IntEnum):
START = 0
STOP = 1
RESTART = 2
def create_core_node() -> CmdTreeNode:
op_code_strs = [
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("core", "Core Controller", hide_children_for_print=True)
for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info))
return node
def pack_core_commands( # noqa C901
q: DefaultPusQueueHelper, cmd_str: str
): # noqa: C901 , complexity okay here
if cmd_str == OpCode.ANNOUNCE_VERSION:
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))
elif cmd_str == OpCode.ANNOUNCE_CURRENT_IMAGE:
q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}")
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE)
)
elif cmd_str == OpCode.ANNOUNCE_BOOT_COUNTS:
q.add_log_cmd(f"{Info.ANNOUNCE_BOOT_COUNTS}")
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS)
)
elif cmd_str == OpCode.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params()
add_xsc_reboot_cmd(
q=q,
reboot_self=reboot_self,
chip=chip_select,
copy=copy_select,
)
elif cmd_str == OpCode.REBOOT_FULL:
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
)
elif cmd_str == OpCode.XSC_REBOOT_SELF:
add_xsc_reboot_cmd(q=q, reboot_self=True)
elif cmd_str == OpCode.SYSTEMCTL_CMD_EXECUTOR:
print("systemctl command types: ")
for entry in SystemctlCmd:
print(f"{entry}: {entry.name}")
systemctl_cmd = SystemctlCmd(
int(input("Specify systemctl command type by key: "))
)
unit_name = input("Specify unit name: ")
q.add_pus_tc(create_systemctl_cmd(systemctl_cmd, unit_name))
elif cmd_str == OpCode.EXECUTE_SHELL_CMD_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING,
user_data=custom_cmd.encode(),
)
)
elif cmd_str == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING,
user_data=custom_cmd.encode(),
)
)
elif cmd_str == OpCode.XSC_REBOOT_0_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
)
elif cmd_str == OpCode.XSC_REBOOT_0_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_0,
copy=Copy.COPY_1_GOLD,
)
elif cmd_str == OpCode.XSC_REBOOT_1_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
)
elif cmd_str == OpCode.XSC_REBOOT_1_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_1,
copy=Copy.COPY_1_GOLD,
)
elif cmd_str == OpCode.READ_REBOOT_MECHANISM_INFO:
q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.READ_REBOOT_MECHANISM_INFO,
)
)
elif cmd_str == OpCode.DISABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Disabling reboot file handling")
user_data = bytearray([0])
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data,
)
)
elif cmd_str == OpCode.ENABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Enabling reboot file handling")
user_data = bytearray([1])
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data,
)
)
elif cmd_str == OpCode.RESET_ALL_REBOOT_COUNTERS:
q.add_log_cmd("Resetting all reboot counters")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.RESET_REBOOT_COUNTER,
)
)
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_00:
reset_specific_boot_counter(q, 0, 0)
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_01:
reset_specific_boot_counter(q, 0, 1)
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_10:
reset_specific_boot_counter(q, 1, 0)
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_11:
reset_specific_boot_counter(q, 1, 1)
elif cmd_str == OpCode.RWD_SET_MAX_REBOOT_CNT:
max_count = int(input("Set new maximum reboot threshold [1, 50]: "))
if max_count < 1 or max_count > 50:
raise ValueError("Invalid value, must be in range 1 to 50")
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID,
ActionId.SET_MAX_REBOOT_CNT,
user_data=bytes([max_count]),
)
)
elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_0:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_1:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
elif cmd_str == OpCode.OBSW_UPDATE_FROM_TMP:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
elif cmd_str == OpCode.AUTO_SWITCH_ENABLE:
q.add_log_cmd(Info.AUTO_SWITCH_ENABLE)
chip, copy = determine_chip_and_copy()
user_data = bytes([chip, copy])
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_ENABLE, user_data
)
)
elif cmd_str == OpCode.AUTO_SWITCH_DISABLE:
q.add_log_cmd(Info.AUTO_SWITCH_DISABLE)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_DISABLE)
)
elif cmd_str == OpCode.SWITCH_TO_SD_0:
q.add_log_cmd(Info.SWITCH_TO_SD_0)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
)
)
elif cmd_str == OpCode.SWITCH_TO_SD_1:
q.add_log_cmd(Info.SWITCH_TO_SD_1)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
)
)
elif cmd_str == OpCode.SWITCH_TO_BOTH_SD_CARDS:
while True:
active_sd_card = int(input("Please specify active SD card [0/1]: "))
if active_sd_card not in [0, 1]:
_LOGGER.warning("Invalid SD card specified. Try again")
break
q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS,
user_data=bytes([active_sd_card]),
)
)
elif cmd_str == OpCode.GET_HK:
q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
q.add_pus_tc(generate_one_hk_command(sid))
elif cmd_str == OpCode.SET_PREF_SD:
q.add_log_cmd("Set preferred SD card")
pref_sd = int(
input("Specify which SD card to set as the preferred one (0/1): ")
)
if pref_sd not in [0, 1]:
raise ValueError("Only 0 or 1 allowed for preferred SD card")
q.add_pus_tc(
create_load_param_cmd(
parameter=create_scalar_u8_parameter(
object_id=CORE_CONTROLLER_ID,
domain_id=0,
unique_id=ParamId.PREF_SD,
parameter=pref_sd,
),
apid=0,
)
)
elif cmd_str == OpCode.CP_HELPER:
cp_recursive = int(input("Copy recursively (0/1) ?: "))
if cp_recursive not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
cp_force = int(input("Copy with force option(0/1) ?: "))
if cp_force not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
user_data = bytearray([cp_recursive, cp_force])
user_data.extend(packet_source_dest_path("Copy"))
q.add_log_cmd(Info.CP_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data)
)
elif cmd_str == OpCode.MV_HELPER:
user_data = packet_source_dest_path("Move")
q.add_log_cmd(Info.MV_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data)
)
elif cmd_str == OpCode.RM_HELPER:
rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: "))
if rm_recursive not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
rm_force = int(input("Remove with force (-f) option (0/1) ?: "))
if rm_force not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
user_data = bytearray([rm_recursive, rm_force])
removed_file_or_dir = input("Specify absolute path to be removed: ")
user_data.extend(removed_file_or_dir.encode())
user_data.append(0)
q.add_log_cmd(Info.RM_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data)
)
elif cmd_str == OpCode.LIST_DIR_INTO_FILE:
q.add_log_cmd(Info.LIST_DIR_INTO_FILE)
user_data = list_directory_base_user_data()
dest_file_path = input("Destination file path: ")
user_data.extend(dest_file_path.encode())
user_data.append(0)
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data
)
)
elif cmd_str == OpCode.LIST_DIR_DUMP_DIRECTLY:
q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY)
user_data = list_directory_base_user_data()
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data
)
)
elif cmd_str == OpCode.MKDIR_HELPER:
q.add_log_cmd(Info.MKDIR_HELPER)
user_data = input("Specify absolute path of newly created directory: ")
user_data = bytearray(user_data.encode())
user_data.append(0)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
)
elif cmd_str == OpCode.ENABLE_SCHEDULER:
q.add_log_cmd(Info.ENABLE_SCHEDULER)
q.add_pus_tc(create_enable_tc_sched_cmd())
elif cmd_str == OpCode.DISABLE_SCHEDULER:
q.add_log_cmd(Info.DISABLE_SCHEDULER)
q.add_pus_tc(create_disable_tc_sched_cmd())
elif cmd_str == OpCode.UPDATE_LEAP_SECONDS:
q.add_log_cmd(Info.UPDATE_LEAP_SECONDS)
leap_seconds = int(input("Specify new Leap Seconds Value: ")).to_bytes(
length=2, signed=False, byteorder="big"
)
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.UPDATE_LEAP_SECONRS, leap_seconds
)
)
else:
_LOGGER.warning(
f"Unknown operation code {cmd_str} for core controller commands"
)
def create_systemctl_cmd(systemctl_cmd: SystemctlCmd, unit_name: str):
cmd_data = bytearray([systemctl_cmd])
cmd_data.extend(unit_name.encode())
return create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR,
user_data=cmd_data,
)
def list_directory_base_user_data() -> bytearray:
all_opt = int(input("Use all (-a) option (0/1) ?: "))
if all_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
recursive_opt = int(input("Use recursive (-R) option (0/1) ?: "))
if recursive_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
compression_opt = int(input("Compress target file (0/1) ?: "))
if compression_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
listing_path = input("Specify listing path (absolute path): ")
user_data = bytearray([all_opt, recursive_opt, compression_opt])
user_data.extend(listing_path.encode())
user_data.append(0)
return user_data
def packet_source_dest_path(context: str) -> bytes:
source = input(f"Specify {context} source file: ")
dest = input(f"Specify {context} destination file: ")
raw_src_dest = bytearray(source.encode())
raw_src_dest.append(0)
raw_src_dest.extend(dest.encode())
raw_src_dest.append(0)
return raw_src_dest
def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
q.add_log_cmd(f"Resetting boot counter {chip} {copy}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.RESET_REBOOT_COUNTER,
user_data=bytes([chip, copy]),
)
)
def create_full_reboot_cmds() -> PusTelecommand:
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
def determine_reboot_params() -> Tuple[bool, Chip, Copy]:
reboot_self = input("Reboot self? [y/n]: ")
if reboot_self in ["y", "yes", "1"]:
_LOGGER.info("Rebooting currently running image")
return True, Chip.NONE, Copy.NONE
_LOGGER.info("Rebooting image specified by chip and copy")
chip, copy = determine_chip_and_copy()
return False, chip, copy
def determine_chip_and_copy() -> Tuple[Chip, Copy]:
while True:
chip_select = input("Chip select [0/1]: ")
if chip_select in ["0", "1"]:
if chip_select == "0":
chip_select = Chip.CHIP_0
else:
chip_select = Chip.CHIP_1
break
else:
_LOGGER.warning("Invalid chip select value. Try again")
while True:
copy_select = input("Copy select [0/1]: ")
if copy_select in ["0", "1"]:
if copy_select == "0":
copy_select = Copy.COPY_0_NOM
else:
copy_select = Copy.COPY_1_GOLD
break
else:
_LOGGER.warning("Invalid copy select value. Try again")
return chip_select, copy_select
def pack_obsw_update_cmd(action_id: int) -> PusTelecommand:
chip, copy = determine_chip_and_copy()
user_data = bytearray([chip, copy])
custom_file_name = input("Use custom filename [y/n] ?: ")
if custom_file_name.lower() in ["y", "yes", "1"]:
custom_file_name = input("Specify custom filename: ")
user_data.extend(custom_file_name.encode())
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data
)
def add_xsc_reboot_cmd(
q: DefaultPusQueueHelper,
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
):
if reboot_self:
q.add_log_cmd("Packing reboot command for current image")
else:
q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}")
q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy))
def create_xsc_reboot_cmds(
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
) -> PusTelecommand:
tc_data = bytearray()
if reboot_self:
tc_data.append(True)
else:
tc_data.append(False)
tc_data.append(chip)
tc_data.append(copy)
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data
)
def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.HK:
fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage}"
)
pw.dlog(printout)
pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[inc_len:], num_vars=3))
def handle_core_ctrl_action_replies(
action_id: int, pw: PrintWrapper, custom_data: bytes
):
if action_id == ActionId.READ_REBOOT_MECHANISM_INFO:
handle_reboot_mechanism_info_reply(pw, custom_data)
elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
handle_list_dir_dump_reply(pw, custom_data)
def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes):
pw.dlog("Received reboot mechansm information")
fmt_str = "!BIIIIIBBBBBBBB"
inc_len = struct.calcsize(fmt_str)
if len(custom_data) < inc_len:
raise ValueError(f"Received custom data shorter than expected {inc_len}")
(
enabled,
max_count,
img00_count,
img01_count,
img10_count,
img11_count,
img00_lock,
img01_lock,
img10_lock,
img11_lock,
last_chip,
last_copy,
next_chip,
next_copy,
) = struct.unpack(fmt_str, custom_data[:inc_len])
pw.dlog(f"Enabled: {enabled}")
pw.dlog(f"Max Count: {max_count}")
pw.dlog(f"Count 00: {img00_count}")
pw.dlog(f"Count 01: {img01_count}")
pw.dlog(f"Count 10: {img10_count}")
pw.dlog(f"Count 11: {img11_count}")
pw.dlog(f"Lock 00: {img00_lock}")
pw.dlog(f"Lock 01: {img01_lock}")
pw.dlog(f"Lock 10: {img10_lock}")
pw.dlog(f"Lock 11: {img11_lock}")
pw.dlog(f"Last Chip: {last_chip}")
pw.dlog(f"Last Copy: {last_copy}")
pw.dlog(f"Next Chip: {next_chip}")
pw.dlog(f"Next Copy: {next_copy}")
def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes):
if len(custom_data) < 4:
_LOGGER.warning("Data unexpectedly small")
return
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
compressed = custom_data[8]
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
# Include length of NULL termination
file_data_offset = 9 + len(ls_cmd) + 1
pw.dlog(
f"Received directory listing dump for ls command {ls_cmd}. "
f"Chunk {seq_idx + 1}/{total_chunks}"
)
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
if seq_idx_ == 0 and path_.exists():
os.remove(path_)
if compressed:
path = Path("dir_listing.txt.gz")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
)
with open(path, "ab") as listing_file:
listing_file.write(custom_data[file_data_offset:])
else:
path = Path("dir_listing.txt")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt")
with open(path, "a") as listing_file:
listing_file_str = custom_data[file_data_offset:].decode()
listing_file.write(listing_file_str)
if seq_idx + 1 == total_chunks:
pw.dlog("Full directory listing: ")
with open("dir_listing.txt", "r") as listing_file:
print(listing_file.read())