724 lines
26 KiB
Python
724 lines
26 KiB
Python
import enum
|
|
import logging
|
|
import os
|
|
import struct
|
|
from pathlib import Path
|
|
from typing import Tuple
|
|
|
|
from spacepackets.ecss import PusTelecommand
|
|
from tmtccmd.config import CmdTreeNode
|
|
|
|
from tmtccmd.tmtc import DefaultPusQueueHelper
|
|
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
|
from tmtccmd.pus.tc.s3_fsfw_hk import make_sid, generate_one_hk_command
|
|
from tmtccmd.pus.s20_fsfw_param import (
|
|
create_scalar_u8_parameter,
|
|
create_load_param_cmd,
|
|
)
|
|
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
|
from tmtccmd.pus.s11_tc_sched import (
|
|
create_enable_tc_sched_cmd,
|
|
create_disable_tc_sched_cmd,
|
|
)
|
|
|
|
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
|
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class SdState(enum.IntEnum):
|
|
OFF = 0
|
|
ON = 1
|
|
MOUNTED = 2
|
|
|
|
|
|
class SdCardSelect(enum.IntEnum):
|
|
SD_0 = 0
|
|
SD_1 = 1
|
|
BOTH = 2
|
|
NONE = 3
|
|
|
|
|
|
class ActionId(enum.IntEnum):
|
|
ANNOUNCE_VERSION = 1
|
|
ANNOUNCE_CURRENT_IMAGE = 2
|
|
ANNOUNCE_BOOT_COUNTS = 3
|
|
SWITCH_REBOOT_FILE_HANDLING = 5
|
|
RESET_REBOOT_COUNTER = 6
|
|
SWITCH_IMG_LOCK = 7
|
|
SET_MAX_REBOOT_CNT = 8
|
|
READ_REBOOT_MECHANISM_INFO = 9
|
|
UPDATE_OBSW_FROM_SD_0 = 10
|
|
UPDATE_OBSW_FROM_SD_1 = 11
|
|
UPDATE_OBSW_FROM_TMP = 12
|
|
SWITCH_TO_SD_0 = 16
|
|
SWITCH_TO_SD_1 = 17
|
|
SWITCH_TO_BOTH_SD_CARDS = 18
|
|
AUTO_SWITCH_ENABLE = 19
|
|
AUTO_SWITCH_DISABLE = 20
|
|
XSC_REBOOT = 32
|
|
FULL_REBOOT = 34
|
|
EXECUTE_SHELL_CMD_BLOCKING = 40
|
|
EXECUTE_SHELL_CMD_NON_BLOCKING = 41
|
|
SYSTEMCTL_CMD_EXECUTOR = 42
|
|
LIST_DIR_INTO_FILE = 50
|
|
LIST_DIR_DUMP_DIRECTLY = 51
|
|
CP_HELPER = 52
|
|
MV_HELPER = 53
|
|
RM_HELPER = 54
|
|
MKDIR_HELPER = 55
|
|
ENABLE_SCHEDULER = 56
|
|
UPDATE_LEAP_SECONRS = 60
|
|
|
|
|
|
class ParamId(enum.IntEnum):
|
|
PREF_SD = 0
|
|
|
|
|
|
class SetId(enum.IntEnum):
|
|
HK = 5
|
|
|
|
|
|
class OpCode:
|
|
ANNOUNCE_VERSION = "announce_version"
|
|
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
|
|
ANNOUNCE_BOOT_COUNTS = "announce_boot_counts"
|
|
EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking"
|
|
EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking"
|
|
SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd"
|
|
LIST_DIR_INTO_FILE = "list_dir_into_file"
|
|
LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly"
|
|
CP_HELPER = "cp_helper"
|
|
MV_HELPER = "mv_helper"
|
|
RM_HELPER = "rm_helper"
|
|
MKDIR_HELPER = "mkdir_helper"
|
|
SET_PREF_SD = "set_pref_sd"
|
|
REBOOT_XSC = "reboot_xsc"
|
|
XSC_REBOOT_SELF = "reboot_self"
|
|
XSC_REBOOT_0_0 = "reboot_00"
|
|
XSC_REBOOT_0_1 = "reboot_01"
|
|
XSC_REBOOT_1_0 = "reboot_10"
|
|
XSC_REBOOT_1_1 = "reboot_11"
|
|
REBOOT_FULL = "reboot_regular"
|
|
GET_HK = "get_hk"
|
|
OBSW_UPDATE_FROM_SD_0 = "obsw_update_sd0"
|
|
OBSW_UPDATE_FROM_SD_1 = "obsw_update_sd1"
|
|
OBSW_UPDATE_FROM_TMP = "obsw_update_tmp"
|
|
SWITCH_TO_SD_0 = "switch_to_sd_0"
|
|
SWITCH_TO_SD_1 = "switch_to_sd_1"
|
|
SWITCH_TO_BOTH_SD_CARDS = "switch_to_both_sd_cards"
|
|
READ_REBOOT_MECHANISM_INFO = "rbh_info"
|
|
ENABLE_REBOOT_FILE_HANDLING = "rwd_on"
|
|
DISABLE_REBOOT_FILE_HANDLING = "rwd_off"
|
|
RESET_ALL_REBOOT_COUNTERS = "rwd_reset_a"
|
|
RWD_RESET_REBOOT_COUNTER_00 = "rwd_reset_00"
|
|
RWD_RESET_REBOOT_COUNTER_01 = "rwd_reset_01"
|
|
RWD_RESET_REBOOT_COUNTER_10 = "rwd_reset_10"
|
|
RWD_RESET_REBOOT_COUNTER_11 = "rwd_reset_11"
|
|
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
|
|
AUTO_SWITCH_ENABLE = "auto_switch_enable"
|
|
AUTO_SWITCH_DISABLE = "auto_switch_disable"
|
|
ENABLE_SCHEDULER = "enable_scheduler"
|
|
DISABLE_SCHEDULER = "disable_scheduler"
|
|
UPDATE_LEAP_SECONDS = "leap_seconds_update"
|
|
|
|
|
|
class Info:
|
|
ANNOUNCE_VERSION = "Announce version"
|
|
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
|
|
ANNOUNCE_BOOT_COUNTS = "Announce boot counts"
|
|
SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command"
|
|
EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking"
|
|
EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking"
|
|
SET_PREF_SD = "Set preferred SD card"
|
|
REBOOT_XSC = "XSC reboot with prompt"
|
|
REBOOT_FULL = "Full regular reboot"
|
|
XSC_REBOOT_SELF = "Reboot Self"
|
|
XSC_REBOOT_0_0 = "Reboot to 0 0"
|
|
XSC_REBOOT_0_1 = "Reboot to 0 1"
|
|
XSC_REBOOT_1_0 = "Reboot to 1 0"
|
|
XSC_REBOOT_1_1 = "Reboot to 1 1"
|
|
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
|
|
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
|
|
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
|
|
READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information"
|
|
SWITCH_TO_SD_0 = "Switch to SD card 0"
|
|
SWITCH_TO_SD_1 = "Switch to SD card 1"
|
|
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
|
|
LIST_DIR_INTO_FILE = "List directory, dump output into file"
|
|
LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly"
|
|
CP_HELPER = "Filesystem Copy Helper"
|
|
MV_HELPER = "Filesystem Move Helper"
|
|
RM_HELPER = "Filesystem Removal Helper"
|
|
MKDIR_HELPER = "Filesystem Directory Creation Helper"
|
|
ENABLE_REBOOT_FILE_HANDLING = "Enable reboot file handling"
|
|
DISABLE_REBOOT_FILE_HANDLING = "Disable reboot file handling"
|
|
RESET_ALL_REBOOT_COUNTERS = "Reset all reboot counters"
|
|
RWD_RESET_REBOOT_COUNTER_00 = "Reset reboot counter 0 0"
|
|
RWD_RESET_REBOOT_COUNTER_01 = "Reset reboot counter 0 0"
|
|
RWD_RESET_REBOOT_COUNTER_10 = "Reset reboot counter 1 0"
|
|
GET_HK = "Get HK set"
|
|
RWD_RESET_REBOOT_COUNTER_11 = "Reset reboot counter 1 1"
|
|
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
|
|
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
|
|
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
|
|
ENABLE_SCHEDULER = "Enable scheduler"
|
|
DISABLE_SCHEDULER = "Disable scheduler"
|
|
UPDATE_LEAP_SECONDS = "Updates the Leap Seconds"
|
|
|
|
|
|
class Chip(enum.IntEnum):
|
|
CHIP_0 = 0
|
|
CHIP_1 = 1
|
|
NONE = 2
|
|
|
|
|
|
class Copy(enum.IntEnum):
|
|
COPY_0_NOM = 0
|
|
COPY_1_GOLD = 1
|
|
NONE = 2
|
|
|
|
|
|
class SystemctlCmd(enum.IntEnum):
|
|
START = 0
|
|
STOP = 1
|
|
RESTART = 2
|
|
|
|
|
|
def create_core_node() -> CmdTreeNode:
|
|
op_code_strs = [
|
|
getattr(OpCode, key) for key in dir(OpCode) if not key.startswith("__")
|
|
]
|
|
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
|
|
combined_dict = dict(zip(op_code_strs, info_strs))
|
|
node = CmdTreeNode("core", "Core Controller", hide_children_for_print=True)
|
|
for op_code, info in combined_dict.items():
|
|
node.add_child(CmdTreeNode(op_code, info))
|
|
return node
|
|
|
|
|
|
def pack_core_commands( # noqa C901
|
|
q: DefaultPusQueueHelper, cmd_str: str
|
|
): # noqa: C901 , complexity okay here
|
|
if cmd_str == OpCode.ANNOUNCE_VERSION:
|
|
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
|
|
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))
|
|
elif cmd_str == OpCode.ANNOUNCE_CURRENT_IMAGE:
|
|
q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE)
|
|
)
|
|
elif cmd_str == OpCode.ANNOUNCE_BOOT_COUNTS:
|
|
q.add_log_cmd(f"{Info.ANNOUNCE_BOOT_COUNTS}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS)
|
|
)
|
|
elif cmd_str == OpCode.REBOOT_XSC:
|
|
reboot_self, chip_select, copy_select = determine_reboot_params()
|
|
add_xsc_reboot_cmd(
|
|
q=q,
|
|
reboot_self=reboot_self,
|
|
chip=chip_select,
|
|
copy=copy_select,
|
|
)
|
|
elif cmd_str == OpCode.REBOOT_FULL:
|
|
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.XSC_REBOOT_SELF:
|
|
add_xsc_reboot_cmd(q=q, reboot_self=True)
|
|
elif cmd_str == OpCode.SYSTEMCTL_CMD_EXECUTOR:
|
|
print("systemctl command types: ")
|
|
for entry in SystemctlCmd:
|
|
print(f"{entry}: {entry.name}")
|
|
systemctl_cmd = SystemctlCmd(
|
|
int(input("Specify systemctl command type by key: "))
|
|
)
|
|
unit_name = input("Specify unit name: ")
|
|
q.add_pus_tc(create_systemctl_cmd(systemctl_cmd, unit_name))
|
|
elif cmd_str == OpCode.EXECUTE_SHELL_CMD_BLOCKING:
|
|
custom_cmd = input("Please specify command to execute: ")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING,
|
|
user_data=custom_cmd.encode(),
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING:
|
|
custom_cmd = input("Please specify command to execute: ")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING,
|
|
user_data=custom_cmd.encode(),
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.XSC_REBOOT_0_0:
|
|
add_xsc_reboot_cmd(
|
|
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
|
|
)
|
|
elif cmd_str == OpCode.XSC_REBOOT_0_1:
|
|
add_xsc_reboot_cmd(
|
|
q=q,
|
|
reboot_self=False,
|
|
chip=Chip.CHIP_0,
|
|
copy=Copy.COPY_1_GOLD,
|
|
)
|
|
elif cmd_str == OpCode.XSC_REBOOT_1_0:
|
|
add_xsc_reboot_cmd(
|
|
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
|
|
)
|
|
elif cmd_str == OpCode.XSC_REBOOT_1_1:
|
|
add_xsc_reboot_cmd(
|
|
q=q,
|
|
reboot_self=False,
|
|
chip=Chip.CHIP_1,
|
|
copy=Copy.COPY_1_GOLD,
|
|
)
|
|
elif cmd_str == OpCode.READ_REBOOT_MECHANISM_INFO:
|
|
q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.READ_REBOOT_MECHANISM_INFO,
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.DISABLE_REBOOT_FILE_HANDLING:
|
|
q.add_log_cmd("Disabling reboot file handling")
|
|
user_data = bytearray([0])
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
|
|
user_data=user_data,
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.ENABLE_REBOOT_FILE_HANDLING:
|
|
q.add_log_cmd("Enabling reboot file handling")
|
|
user_data = bytearray([1])
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
|
|
user_data=user_data,
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.RESET_ALL_REBOOT_COUNTERS:
|
|
q.add_log_cmd("Resetting all reboot counters")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.RESET_REBOOT_COUNTER,
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_00:
|
|
reset_specific_boot_counter(q, 0, 0)
|
|
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_01:
|
|
reset_specific_boot_counter(q, 0, 1)
|
|
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_10:
|
|
reset_specific_boot_counter(q, 1, 0)
|
|
elif cmd_str == OpCode.RWD_RESET_REBOOT_COUNTER_11:
|
|
reset_specific_boot_counter(q, 1, 1)
|
|
elif cmd_str == OpCode.RWD_SET_MAX_REBOOT_CNT:
|
|
max_count = int(input("Set new maximum reboot threshold [1, 50]: "))
|
|
if max_count < 1 or max_count > 50:
|
|
raise ValueError("Invalid value, must be in range 1 to 50")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
CORE_CONTROLLER_ID,
|
|
ActionId.SET_MAX_REBOOT_CNT,
|
|
user_data=bytes([max_count]),
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_0:
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
|
|
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
|
|
elif cmd_str == OpCode.OBSW_UPDATE_FROM_SD_1:
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
|
|
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
|
|
elif cmd_str == OpCode.OBSW_UPDATE_FROM_TMP:
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
|
|
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
|
|
elif cmd_str == OpCode.AUTO_SWITCH_ENABLE:
|
|
q.add_log_cmd(Info.AUTO_SWITCH_ENABLE)
|
|
chip, copy = determine_chip_and_copy()
|
|
user_data = bytes([chip, copy])
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_ENABLE, user_data
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.AUTO_SWITCH_DISABLE:
|
|
q.add_log_cmd(Info.AUTO_SWITCH_DISABLE)
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_DISABLE)
|
|
)
|
|
elif cmd_str == OpCode.SWITCH_TO_SD_0:
|
|
q.add_log_cmd(Info.SWITCH_TO_SD_0)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.SWITCH_TO_SD_1:
|
|
q.add_log_cmd(Info.SWITCH_TO_SD_1)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.SWITCH_TO_BOTH_SD_CARDS:
|
|
while True:
|
|
active_sd_card = int(input("Please specify active SD card [0/1]: "))
|
|
if active_sd_card not in [0, 1]:
|
|
_LOGGER.warning("Invalid SD card specified. Try again")
|
|
break
|
|
q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS,
|
|
user_data=bytes([active_sd_card]),
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.GET_HK:
|
|
q.add_log_cmd("Requesting housekeeping set")
|
|
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
|
|
q.add_pus_tc(generate_one_hk_command(sid))
|
|
elif cmd_str == OpCode.SET_PREF_SD:
|
|
q.add_log_cmd("Set preferred SD card")
|
|
pref_sd = int(
|
|
input("Specify which SD card to set as the preferred one (0/1): ")
|
|
)
|
|
if pref_sd not in [0, 1]:
|
|
raise ValueError("Only 0 or 1 allowed for preferred SD card")
|
|
q.add_pus_tc(
|
|
create_load_param_cmd(
|
|
parameter=create_scalar_u8_parameter(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
domain_id=0,
|
|
unique_id=ParamId.PREF_SD,
|
|
parameter=pref_sd,
|
|
),
|
|
apid=0,
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.CP_HELPER:
|
|
cp_recursive = int(input("Copy recursively (0/1) ?: "))
|
|
if cp_recursive not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
cp_force = int(input("Copy with force option(0/1) ?: "))
|
|
if cp_force not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
user_data = bytearray([cp_recursive, cp_force])
|
|
user_data.extend(packet_source_dest_path("Copy"))
|
|
q.add_log_cmd(Info.CP_HELPER)
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data)
|
|
)
|
|
elif cmd_str == OpCode.MV_HELPER:
|
|
user_data = packet_source_dest_path("Move")
|
|
q.add_log_cmd(Info.MV_HELPER)
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data)
|
|
)
|
|
elif cmd_str == OpCode.RM_HELPER:
|
|
rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: "))
|
|
if rm_recursive not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
rm_force = int(input("Remove with force (-f) option (0/1) ?: "))
|
|
if rm_force not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
user_data = bytearray([rm_recursive, rm_force])
|
|
removed_file_or_dir = input("Specify absolute path to be removed: ")
|
|
user_data.extend(removed_file_or_dir.encode())
|
|
user_data.append(0)
|
|
q.add_log_cmd(Info.RM_HELPER)
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data)
|
|
)
|
|
elif cmd_str == OpCode.LIST_DIR_INTO_FILE:
|
|
q.add_log_cmd(Info.LIST_DIR_INTO_FILE)
|
|
user_data = list_directory_base_user_data()
|
|
dest_file_path = input("Destination file path: ")
|
|
user_data.extend(dest_file_path.encode())
|
|
user_data.append(0)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.LIST_DIR_DUMP_DIRECTLY:
|
|
q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY)
|
|
user_data = list_directory_base_user_data()
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data
|
|
)
|
|
)
|
|
elif cmd_str == OpCode.MKDIR_HELPER:
|
|
q.add_log_cmd(Info.MKDIR_HELPER)
|
|
user_data = input("Specify absolute path of newly created directory: ")
|
|
user_data = bytearray(user_data.encode())
|
|
user_data.append(0)
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
|
|
)
|
|
elif cmd_str == OpCode.ENABLE_SCHEDULER:
|
|
q.add_log_cmd(Info.ENABLE_SCHEDULER)
|
|
q.add_pus_tc(create_enable_tc_sched_cmd())
|
|
elif cmd_str == OpCode.DISABLE_SCHEDULER:
|
|
q.add_log_cmd(Info.DISABLE_SCHEDULER)
|
|
q.add_pus_tc(create_disable_tc_sched_cmd())
|
|
elif cmd_str == OpCode.UPDATE_LEAP_SECONDS:
|
|
q.add_log_cmd(Info.UPDATE_LEAP_SECONDS)
|
|
leap_seconds = int(input("Specify new Leap Seconds Value: ")).to_bytes(
|
|
length=2, signed=False, byteorder="big"
|
|
)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
CORE_CONTROLLER_ID, ActionId.UPDATE_LEAP_SECONRS, leap_seconds
|
|
)
|
|
)
|
|
else:
|
|
_LOGGER.warning(
|
|
f"Unknown operation code {cmd_str} for core controller commands"
|
|
)
|
|
|
|
|
|
def create_systemctl_cmd(systemctl_cmd: SystemctlCmd, unit_name: str):
|
|
cmd_data = bytearray([systemctl_cmd])
|
|
cmd_data.extend(unit_name.encode())
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR,
|
|
user_data=cmd_data,
|
|
)
|
|
|
|
|
|
def list_directory_base_user_data() -> bytearray:
|
|
all_opt = int(input("Use all (-a) option (0/1) ?: "))
|
|
if all_opt not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
recursive_opt = int(input("Use recursive (-R) option (0/1) ?: "))
|
|
if recursive_opt not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
compression_opt = int(input("Compress target file (0/1) ?: "))
|
|
if compression_opt not in [0, 1]:
|
|
raise ValueError("Invalid value, only 0 or 1 allowed")
|
|
listing_path = input("Specify listing path (absolute path): ")
|
|
user_data = bytearray([all_opt, recursive_opt, compression_opt])
|
|
user_data.extend(listing_path.encode())
|
|
user_data.append(0)
|
|
return user_data
|
|
|
|
|
|
def packet_source_dest_path(context: str) -> bytes:
|
|
source = input(f"Specify {context} source file: ")
|
|
dest = input(f"Specify {context} destination file: ")
|
|
raw_src_dest = bytearray(source.encode())
|
|
raw_src_dest.append(0)
|
|
raw_src_dest.extend(dest.encode())
|
|
raw_src_dest.append(0)
|
|
return raw_src_dest
|
|
|
|
|
|
def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
|
|
q.add_log_cmd(f"Resetting boot counter {chip} {copy}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.RESET_REBOOT_COUNTER,
|
|
user_data=bytes([chip, copy]),
|
|
)
|
|
)
|
|
|
|
|
|
def create_full_reboot_cmds() -> PusTelecommand:
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
|
|
)
|
|
|
|
|
|
def determine_reboot_params() -> Tuple[bool, Chip, Copy]:
|
|
reboot_self = input("Reboot self? [y/n]: ")
|
|
if reboot_self in ["y", "yes", "1"]:
|
|
_LOGGER.info("Rebooting currently running image")
|
|
return True, Chip.NONE, Copy.NONE
|
|
_LOGGER.info("Rebooting image specified by chip and copy")
|
|
chip, copy = determine_chip_and_copy()
|
|
return False, chip, copy
|
|
|
|
|
|
def determine_chip_and_copy() -> Tuple[Chip, Copy]:
|
|
while True:
|
|
chip_select = input("Chip select [0/1]: ")
|
|
if chip_select in ["0", "1"]:
|
|
if chip_select == "0":
|
|
chip_select = Chip.CHIP_0
|
|
else:
|
|
chip_select = Chip.CHIP_1
|
|
break
|
|
else:
|
|
_LOGGER.warning("Invalid chip select value. Try again")
|
|
while True:
|
|
copy_select = input("Copy select [0/1]: ")
|
|
if copy_select in ["0", "1"]:
|
|
if copy_select == "0":
|
|
copy_select = Copy.COPY_0_NOM
|
|
else:
|
|
copy_select = Copy.COPY_1_GOLD
|
|
break
|
|
else:
|
|
_LOGGER.warning("Invalid copy select value. Try again")
|
|
return chip_select, copy_select
|
|
|
|
|
|
def pack_obsw_update_cmd(action_id: int) -> PusTelecommand:
|
|
chip, copy = determine_chip_and_copy()
|
|
user_data = bytearray([chip, copy])
|
|
custom_file_name = input("Use custom filename [y/n] ?: ")
|
|
if custom_file_name.lower() in ["y", "yes", "1"]:
|
|
custom_file_name = input("Specify custom filename: ")
|
|
user_data.extend(custom_file_name.encode())
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data
|
|
)
|
|
|
|
|
|
def add_xsc_reboot_cmd(
|
|
q: DefaultPusQueueHelper,
|
|
reboot_self: bool,
|
|
chip: Chip = Chip.NONE,
|
|
copy: Copy = Copy.NONE,
|
|
):
|
|
if reboot_self:
|
|
q.add_log_cmd("Packing reboot command for current image")
|
|
else:
|
|
q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}")
|
|
q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy))
|
|
|
|
|
|
def create_xsc_reboot_cmds(
|
|
reboot_self: bool,
|
|
chip: Chip = Chip.NONE,
|
|
copy: Copy = Copy.NONE,
|
|
) -> PusTelecommand:
|
|
tc_data = bytearray()
|
|
if reboot_self:
|
|
tc_data.append(True)
|
|
else:
|
|
tc_data.append(False)
|
|
tc_data.append(chip)
|
|
tc_data.append(copy)
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data
|
|
)
|
|
|
|
|
|
def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
|
if set_id == SetId.HK:
|
|
fmt_str = "!fff"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(temperature, ps_voltage, pl_voltage) = struct.unpack(
|
|
fmt_str, hk_data[0 : 0 + inc_len]
|
|
)
|
|
printout = (
|
|
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
|
|
f"PL Voltage [mV] {pl_voltage}"
|
|
)
|
|
pw.dlog(printout)
|
|
pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[inc_len:], num_vars=3))
|
|
|
|
|
|
def handle_core_ctrl_action_replies(
|
|
action_id: int, pw: PrintWrapper, custom_data: bytes
|
|
):
|
|
if action_id == ActionId.READ_REBOOT_MECHANISM_INFO:
|
|
handle_reboot_mechanism_info_reply(pw, custom_data)
|
|
elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
|
|
handle_list_dir_dump_reply(pw, custom_data)
|
|
|
|
|
|
def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes):
|
|
pw.dlog("Received reboot mechansm information")
|
|
fmt_str = "!BIIIIIBBBBBBBB"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
if len(custom_data) < inc_len:
|
|
raise ValueError(f"Received custom data shorter than expected {inc_len}")
|
|
(
|
|
enabled,
|
|
max_count,
|
|
img00_count,
|
|
img01_count,
|
|
img10_count,
|
|
img11_count,
|
|
img00_lock,
|
|
img01_lock,
|
|
img10_lock,
|
|
img11_lock,
|
|
last_chip,
|
|
last_copy,
|
|
next_chip,
|
|
next_copy,
|
|
) = struct.unpack(fmt_str, custom_data[:inc_len])
|
|
pw.dlog(f"Enabled: {enabled}")
|
|
pw.dlog(f"Max Count: {max_count}")
|
|
pw.dlog(f"Count 00: {img00_count}")
|
|
pw.dlog(f"Count 01: {img01_count}")
|
|
pw.dlog(f"Count 10: {img10_count}")
|
|
pw.dlog(f"Count 11: {img11_count}")
|
|
pw.dlog(f"Lock 00: {img00_lock}")
|
|
pw.dlog(f"Lock 01: {img01_lock}")
|
|
pw.dlog(f"Lock 10: {img10_lock}")
|
|
pw.dlog(f"Lock 11: {img11_lock}")
|
|
pw.dlog(f"Last Chip: {last_chip}")
|
|
pw.dlog(f"Last Copy: {last_copy}")
|
|
pw.dlog(f"Next Chip: {next_chip}")
|
|
pw.dlog(f"Next Copy: {next_copy}")
|
|
|
|
|
|
def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes):
|
|
if len(custom_data) < 4:
|
|
_LOGGER.warning("Data unexpectedly small")
|
|
return
|
|
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
|
|
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
|
|
compressed = custom_data[8]
|
|
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
|
|
# Include length of NULL termination
|
|
file_data_offset = 9 + len(ls_cmd) + 1
|
|
pw.dlog(
|
|
f"Received directory listing dump for ls command {ls_cmd}. "
|
|
f"Chunk {seq_idx + 1}/{total_chunks}"
|
|
)
|
|
|
|
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
|
|
if seq_idx_ == 0 and path_.exists():
|
|
os.remove(path_)
|
|
|
|
if compressed:
|
|
path = Path("dir_listing.txt.gz")
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
pw.dlog(
|
|
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
|
|
)
|
|
with open(path, "ab") as listing_file:
|
|
listing_file.write(custom_data[file_data_offset:])
|
|
else:
|
|
path = Path("dir_listing.txt")
|
|
remove_if_exists_and_new(seq_idx, path)
|
|
pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt")
|
|
with open(path, "a") as listing_file:
|
|
listing_file_str = custom_data[file_data_offset:].decode()
|
|
listing_file.write(listing_file_str)
|
|
if seq_idx + 1 == total_chunks:
|
|
pw.dlog("Full directory listing: ")
|
|
with open("dir_listing.txt", "r") as listing_file:
|
|
print(listing_file.read())
|