eive-tmtc/eive_tmtc/tmtc/core.py
Robin Mueller 5456d79965
All checks were successful
EIVE/-/pipeline/head This commit looks good
the endianness is actually really important here
2023-12-13 11:23:55 +01:00

771 lines
28 KiB
Python

import enum
import logging
import os
import struct
from pathlib import Path
from typing import Tuple
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.s11_tc_sched import (
create_enable_tc_sched_cmd,
create_disable_tc_sched_cmd,
)
from tmtccmd.pus.s20_fsfw_param import (
create_load_param_cmd,
create_scalar_u8_parameter,
)
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from eive_tmtc.pus_tm.defs import PrintWrapper
_LOGGER = logging.getLogger(__name__)
class SdState(enum.IntEnum):
OFF = 0
ON = 1
MOUNTED = 2
class SdCardSelect(enum.IntEnum):
SD_0 = 0
SD_1 = 1
BOTH = 2
NONE = 3
class ActionId(enum.IntEnum):
ANNOUNCE_VERSION = 1
ANNOUNCE_CURRENT_IMAGE = 2
ANNOUNCE_BOOT_COUNTS = 3
SWITCH_REBOOT_FILE_HANDLING = 5
RESET_REBOOT_COUNTER = 6
SWITCH_IMG_LOCK = 7
SET_MAX_REBOOT_CNT = 8
READ_REBOOT_MECHANISM_INFO = 9
UPDATE_OBSW_FROM_SD_0 = 10
UPDATE_OBSW_FROM_SD_1 = 11
UPDATE_OBSW_FROM_TMP = 12
SWITCH_TO_SD_0 = 16
SWITCH_TO_SD_1 = 17
SWITCH_TO_BOTH_SD_CARDS = 18
AUTO_SWITCH_ENABLE = 19
AUTO_SWITCH_DISABLE = 20
XSC_REBOOT = 32
FULL_REBOOT = 34
EXECUTE_SHELL_CMD_BLOCKING = 40
EXECUTE_SHELL_CMD_NON_BLOCKING = 41
SYSTEMCTL_CMD_EXECUTOR = 42
LIST_DIR_INTO_FILE = 50
LIST_DIR_DUMP_DIRECTLY = 51
CP_HELPER = 52
MV_HELPER = 53
RM_HELPER = 54
MKDIR_HELPER = 55
ENABLE_SCHEDULER = 56
class ParamId(enum.IntEnum):
PREF_SD = 0
class SetId(enum.IntEnum):
HK = 5
class OpCode:
ANNOUNCE_VERSION = "announce_version"
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
ANNOUNCE_BOOT_COUNTS = "announce_boot_counts"
EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking"
SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd"
LIST_DIR_INTO_FILE = "list_dir_into_file"
LIST_DIR_DUMP_DIRECTLY = "list_dir_dump_directly"
CP_HELPER = "cp_helper"
MV_HELPER = "mv_helper"
RM_HELPER = "rm_helper"
MKDIR_HELPER = "mkdir_helper"
SET_PREF_SD = "set_pref_sd"
REBOOT_XSC = ["reboot_xsc"]
XSC_REBOOT_SELF = ["reboot_self"]
XSC_REBOOT_0_0 = ["reboot_00"]
XSC_REBOOT_0_1 = ["reboot_01"]
XSC_REBOOT_1_0 = ["reboot_10"]
XSC_REBOOT_1_1 = ["reboot_11"]
REBOOT_FULL = ["reboot_regular"]
GET_HK = ["get_hk"]
OBSW_UPDATE_FROM_SD_0 = ["obsw_update_sd0"]
OBSW_UPDATE_FROM_SD_1 = ["obsw_update_sd1"]
OBSW_UPDATE_FROM_TMP = ["obsw_update_tmp"]
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
READ_REBOOT_MECHANISM_INFO = "rbh_info"
ENABLE_REBOOT_FILE_HANDLING = "rwd_on"
DISABLE_REBOOT_FILE_HANDLING = "rwd_off"
RESET_ALL_REBOOT_COUNTERS = "rwd_reset_a"
RWD_RESET_REBOOT_COUNTER_00 = "rwd_reset_00"
RWD_RESET_REBOOT_COUNTER_01 = "rwd_reset_01"
RWD_RESET_REBOOT_COUNTER_10 = "rwd_reset_10"
RWD_RESET_REBOOT_COUNTER_11 = "rwd_reset_11"
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
AUTO_SWITCH_ENABLE = "auto_switch_enable"
AUTO_SWITCH_DISABLE = "auto_switch_disable"
ENABLE_SCHEDULER = "enable_scheduler"
DISABLE_SCHEDULER = "disable_scheduler"
class Info:
ANNOUNCE_VERSION = "Announce version"
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
ANNOUNCE_BOOT_COUNTS = "Announce boot counts"
SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command"
EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking"
SET_PREF_SD = "Set preferred SD card"
REBOOT_XSC = "XSC reboot with prompt"
REBOOT_FULL = "Full regular reboot"
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
READ_REBOOT_MECHANISM_INFO = "Read reboot mechansm information"
SWITCH_TO_SD_0 = "Switch to SD card 0"
SWITCH_TO_SD_1 = "Switch to SD card 1"
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
LIST_DIR_INTO_FILE = "List directory, dump output into file"
LIST_DIR_DUMP_DIRECTLY = "List directory, dump content directly"
CP_HELPER = "Filesystem Copy Helper"
MV_HELPER = "Filesystem Move Helper"
RM_HELPER = "Filesystem Removal Helper"
MKDIR_HELPER = "Filesystem Directory Creation Helper"
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
ENABLE_SCHEDULER = "Enable scheduler"
DISABLE_SCHEDULER = "Disable scheduler"
class Chip(enum.IntEnum):
CHIP_0 = 0
CHIP_1 = 1
NONE = 2
class Copy(enum.IntEnum):
COPY_0_NOM = 0
COPY_1_GOLD = 1
NONE = 2
class SystemctlCmd(enum.IntEnum):
START = 0
STOP = 1
RESTART = 2
@tmtc_definitions_provider
def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.ANNOUNCE_VERSION, info=Info.ANNOUNCE_VERSION)
oce.add(keys=OpCode.ANNOUNCE_CURRENT_IMAGE, info=Info.ANNOUNCE_CURRENT_IMAGE)
oce.add(keys=OpCode.ANNOUNCE_BOOT_COUNTS, info=Info.ANNOUNCE_BOOT_COUNTS)
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL)
oce.add(keys=OpCode.XSC_REBOOT_SELF, info="Reboot Self")
oce.add(keys=OpCode.XSC_REBOOT_0_0, info="Reboot 0 0")
oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1")
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
oce.add(
keys=OpCode.READ_REBOOT_MECHANISM_INFO, info=Info.READ_REBOOT_MECHANISM_INFO
)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add(keys=OpCode.AUTO_SWITCH_ENABLE, info=Info.AUTO_SWITCH_ENABLE)
oce.add(keys=OpCode.AUTO_SWITCH_DISABLE, info=Info.AUTO_SWITCH_DISABLE)
oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR)
oce.add(
keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING
)
oce.add(
keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING,
info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING,
)
oce.add(
keys=OpCode.GET_HK,
info="Request housekeeping set",
)
oce.add(
keys=OpCode.ENABLE_REBOOT_FILE_HANDLING,
info="Enable reboot file handling",
)
oce.add(
keys=OpCode.DISABLE_REBOOT_FILE_HANDLING,
info="Disable reboot file handling",
)
oce.add(
keys=OpCode.RESET_ALL_REBOOT_COUNTERS,
info="Reset all reboot counters",
)
oce.add(
keys=OpCode.RWD_RESET_REBOOT_COUNTER_00,
info="Reset reboot counter 0 0",
)
oce.add(
keys=OpCode.RWD_RESET_REBOOT_COUNTER_01,
info="Reset reboot counter 0 1",
)
oce.add(
keys=OpCode.RWD_RESET_REBOOT_COUNTER_10,
info="Reset reboot counter 1 0",
)
oce.add(
keys=OpCode.RWD_RESET_REBOOT_COUNTER_11,
info="Reset reboot counter 1 1",
)
oce.add(
keys=OpCode.RWD_SET_MAX_REBOOT_CNT,
info="Reset max reboot count for reboot watchdog",
)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0)
oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1)
oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS)
oce.add(keys=OpCode.LIST_DIR_INTO_FILE, info=Info.LIST_DIR_INTO_FILE)
oce.add(keys=OpCode.LIST_DIR_DUMP_DIRECTLY, info=Info.LIST_DIR_DUMP_DIRECTLY)
oce.add(keys=OpCode.MV_HELPER, info=Info.MV_HELPER)
oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER)
oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER)
oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER)
oce.add(keys=OpCode.ENABLE_SCHEDULER, info=Info.ENABLE_SCHEDULER)
oce.add(keys=OpCode.DISABLE_SCHEDULER, info=Info.DISABLE_SCHEDULER)
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
def pack_core_commands( # noqa C901
q: DefaultPusQueueHelper, op_code: str
): # noqa: C901 , complexity okay here
if op_code == OpCode.ANNOUNCE_VERSION:
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))
elif op_code == OpCode.ANNOUNCE_CURRENT_IMAGE:
q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}")
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE)
)
elif op_code == OpCode.ANNOUNCE_BOOT_COUNTS:
q.add_log_cmd(f"{Info.ANNOUNCE_BOOT_COUNTS}")
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS)
)
elif op_code in OpCode.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params()
add_xsc_reboot_cmd(
q=q,
reboot_self=reboot_self,
chip=chip_select,
copy=copy_select,
)
elif op_code in OpCode.REBOOT_FULL:
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
)
elif op_code in OpCode.XSC_REBOOT_SELF:
add_xsc_reboot_cmd(q=q, reboot_self=True)
elif op_code == OpCode.SYSTEMCTL_CMD_EXECUTOR:
print("systemctl command types: ")
for entry in SystemctlCmd:
print(f"{entry}: {entry.name}")
systemctl_cmd = SystemctlCmd(
int(input("Specify systemctl command type by key: "))
)
unit_name = input("Specify unit name: ")
q.add_pus_tc(create_systemctl_cmd(systemctl_cmd, unit_name))
elif op_code == OpCode.EXECUTE_SHELL_CMD_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING,
user_data=custom_cmd.encode(),
)
)
elif op_code == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING,
user_data=custom_cmd.encode(),
)
)
elif op_code in OpCode.XSC_REBOOT_0_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
)
elif op_code in OpCode.XSC_REBOOT_0_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_0,
copy=Copy.COPY_1_GOLD,
)
elif op_code in OpCode.XSC_REBOOT_1_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
)
elif op_code in OpCode.XSC_REBOOT_1_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_1,
copy=Copy.COPY_1_GOLD,
)
elif op_code == OpCode.READ_REBOOT_MECHANISM_INFO:
q.add_log_cmd(Info.READ_REBOOT_MECHANISM_INFO)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.READ_REBOOT_MECHANISM_INFO,
)
)
elif op_code == OpCode.DISABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Disabling reboot file handling")
user_data = bytearray([0])
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data,
)
)
elif op_code == OpCode.ENABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Enabling reboot file handling")
user_data = bytearray([1])
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data,
)
)
elif op_code == OpCode.RESET_ALL_REBOOT_COUNTERS:
q.add_log_cmd("Resetting all reboot counters")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.RESET_REBOOT_COUNTER,
)
)
elif op_code == OpCode.RWD_RESET_REBOOT_COUNTER_00:
reset_specific_boot_counter(q, 0, 0)
elif op_code == OpCode.RWD_RESET_REBOOT_COUNTER_01:
reset_specific_boot_counter(q, 0, 1)
elif op_code == OpCode.RWD_RESET_REBOOT_COUNTER_10:
reset_specific_boot_counter(q, 1, 0)
elif op_code == OpCode.RWD_RESET_REBOOT_COUNTER_11:
reset_specific_boot_counter(q, 1, 1)
elif op_code == OpCode.RWD_SET_MAX_REBOOT_CNT:
max_count = int(input("Set new maximum reboot threshold [1, 50]: "))
if max_count < 1 or max_count > 50:
raise ValueError("Invalid value, must be in range 1 to 50")
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID,
ActionId.SET_MAX_REBOOT_CNT,
user_data=bytes([max_count]),
)
)
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_1:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
elif op_code in OpCode.OBSW_UPDATE_FROM_TMP:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
elif op_code in OpCode.AUTO_SWITCH_ENABLE:
q.add_log_cmd(Info.AUTO_SWITCH_ENABLE)
chip, copy = determine_chip_and_copy()
user_data = bytes([chip, copy])
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_ENABLE, user_data
)
)
elif op_code in OpCode.AUTO_SWITCH_DISABLE:
q.add_log_cmd(Info.AUTO_SWITCH_DISABLE)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.AUTO_SWITCH_DISABLE)
)
elif op_code in OpCode.SWITCH_TO_SD_0:
q.add_log_cmd(Info.SWITCH_TO_SD_0)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
)
)
elif op_code in OpCode.SWITCH_TO_SD_1:
q.add_log_cmd(Info.SWITCH_TO_SD_1)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
)
)
elif op_code in OpCode.SWITCH_TO_BOTH_SD_CARDS:
while True:
active_sd_card = int(input("Please specify active SD card [0/1]: "))
if active_sd_card not in [0, 1]:
_LOGGER.warning("Invalid SD card specified. Try again")
break
q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS,
user_data=bytes([active_sd_card]),
)
)
elif op_code in OpCode.GET_HK:
q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
q.add_pus_tc(generate_one_hk_command(sid))
elif op_code in OpCode.SET_PREF_SD:
q.add_log_cmd("Set preferred SD card")
pref_sd = int(
input("Specify which SD card to set as the preferred one (0/1): ")
)
if pref_sd not in [0, 1]:
raise ValueError("Only 0 or 1 allowed for preferred SD card")
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u8_parameter(
object_id=CORE_CONTROLLER_ID,
domain_id=0,
unique_id=ParamId.PREF_SD,
parameter=pref_sd,
)
)
)
elif op_code == OpCode.CP_HELPER:
cp_recursive = int(input("Copy recursively (0/1) ?: "))
if cp_recursive not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
cp_force = int(input("Copy with force option(0/1) ?: "))
if cp_force not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
user_data = bytearray([cp_recursive, cp_force])
user_data.extend(packet_source_dest_path("Copy"))
q.add_log_cmd(Info.CP_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.CP_HELPER, user_data)
)
elif op_code == OpCode.MV_HELPER:
user_data = packet_source_dest_path("Move")
q.add_log_cmd(Info.MV_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MV_HELPER, user_data)
)
elif op_code == OpCode.RM_HELPER:
rm_recursive = int(input("Remove with recursive (-r) option (0/1) ?: "))
if rm_recursive not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
rm_force = int(input("Remove with force (-f) option (0/1) ?: "))
if rm_force not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
user_data = bytearray([rm_recursive, rm_force])
removed_file_or_dir = input("Specify absolute path to be removed: ")
user_data.extend(removed_file_or_dir.encode())
user_data.append(0)
q.add_log_cmd(Info.RM_HELPER)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.RM_HELPER, user_data)
)
elif op_code == OpCode.LIST_DIR_INTO_FILE:
q.add_log_cmd(Info.LIST_DIR_INTO_FILE)
user_data = list_directory_base_user_data()
dest_file_path = input("Destination file path: ")
user_data.extend(dest_file_path.encode())
user_data.append(0)
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.LIST_DIR_INTO_FILE, user_data
)
)
elif op_code == OpCode.LIST_DIR_DUMP_DIRECTLY:
q.add_log_cmd(Info.LIST_DIR_DUMP_DIRECTLY)
user_data = list_directory_base_user_data()
q.add_pus_tc(
create_action_cmd(
CORE_CONTROLLER_ID, ActionId.LIST_DIR_DUMP_DIRECTLY, user_data
)
)
elif op_code == OpCode.MKDIR_HELPER:
q.add_log_cmd(Info.MKDIR_HELPER)
user_data = input("Specify absolute path of newly created directory: ")
user_data = bytearray(user_data.encode())
user_data.append(0)
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
)
elif op_code == OpCode.ENABLE_SCHEDULER:
q.add_log_cmd(Info.ENABLE_SCHEDULER)
q.add_pus_tc(create_enable_tc_sched_cmd())
elif op_code == OpCode.DISABLE_SCHEDULER:
q.add_log_cmd(Info.DISABLE_SCHEDULER)
q.add_pus_tc(create_disable_tc_sched_cmd())
else:
_LOGGER.warning(
f"Unknown operation code {op_code} for core controller commands"
)
def create_systemctl_cmd(systemctl_cmd: SystemctlCmd, unit_name: str):
cmd_data = bytearray([systemctl_cmd])
cmd_data.extend(unit_name.encode())
return create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR,
user_data=cmd_data,
)
def list_directory_base_user_data() -> bytearray:
all_opt = int(input("Use all (-a) option (0/1) ?: "))
if all_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
recursive_opt = int(input("Use recursive (-R) option (0/1) ?: "))
if recursive_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
compression_opt = int(input("Compress target file (0/1) ?: "))
if compression_opt not in [0, 1]:
raise ValueError("Invalid value, only 0 or 1 allowed")
listing_path = input("Specify listing path (absolute path): ")
user_data = bytearray([all_opt, recursive_opt, compression_opt])
user_data.extend(listing_path.encode())
user_data.append(0)
return user_data
def packet_source_dest_path(context: str) -> bytes:
source = input(f"Specify {context} source file: ")
dest = input(f"Specify {context} destination file: ")
raw_src_dest = bytearray(source.encode())
raw_src_dest.append(0)
raw_src_dest.extend(dest.encode())
raw_src_dest.append(0)
return raw_src_dest
def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
q.add_log_cmd(f"Resetting boot counter {chip} {copy}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.RESET_REBOOT_COUNTER,
user_data=bytes([chip, copy]),
)
)
def create_full_reboot_cmds() -> PusTelecommand:
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
def determine_reboot_params() -> Tuple[bool, Chip, Copy]:
reboot_self = input("Reboot self? [y/n]: ")
if reboot_self in ["y", "yes", "1"]:
_LOGGER.info("Rebooting currently running image")
return True, Chip.NONE, Copy.NONE
_LOGGER.info("Rebooting image specified by chip and copy")
chip, copy = determine_chip_and_copy()
return False, chip, copy
def determine_chip_and_copy() -> Tuple[Chip, Copy]:
while True:
chip_select = input("Chip select [0/1]: ")
if chip_select in ["0", "1"]:
if chip_select == "0":
chip_select = Chip.CHIP_0
else:
chip_select = Chip.CHIP_1
break
else:
_LOGGER.warning("Invalid chip select value. Try again")
while True:
copy_select = input("Copy select [0/1]: ")
if copy_select in ["0", "1"]:
if copy_select == "0":
copy_select = Copy.COPY_0_NOM
else:
copy_select = Copy.COPY_1_GOLD
break
else:
_LOGGER.warning("Invalid copy select value. Try again")
return chip_select, copy_select
def pack_obsw_update_cmd(action_id: int) -> PusTelecommand:
chip, copy = determine_chip_and_copy()
user_data = bytearray([chip, copy])
custom_file_name = input("Use custom filename [y/n] ?: ")
if custom_file_name.lower() in ["y", "yes", "1"]:
custom_file_name = input("Specify custom filename: ")
user_data.extend(custom_file_name.encode())
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data
)
def add_xsc_reboot_cmd(
q: DefaultPusQueueHelper,
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
):
if reboot_self:
q.add_log_cmd("Packing reboot command for current image")
else:
q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}")
q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy))
def create_xsc_reboot_cmds(
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
) -> PusTelecommand:
tc_data = bytearray()
if reboot_self:
tc_data.append(True)
else:
tc_data.append(False)
tc_data.append(chip)
tc_data.append(copy)
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data
)
def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.HK:
fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage}"
)
pw.dlog(printout)
FsfwTmTcPrinter.get_validity_buffer(
validity_buffer=hk_data[inc_len:], num_vars=3
)
def handle_core_ctrl_action_replies(
action_id: int, pw: PrintWrapper, custom_data: bytes
):
if action_id == ActionId.READ_REBOOT_MECHANISM_INFO:
handle_reboot_mechanism_info_reply(pw, custom_data)
elif action_id == ActionId.LIST_DIR_DUMP_DIRECTLY:
handle_list_dir_dump_reply(pw, custom_data)
def handle_reboot_mechanism_info_reply(pw: PrintWrapper, custom_data: bytes):
pw.dlog("Received reboot mechansm information")
fmt_str = "!BIIIIIBBBBBBBB"
inc_len = struct.calcsize(fmt_str)
if len(custom_data) < inc_len:
raise ValueError(f"Received custom data shorter than expected {inc_len}")
(
enabled,
max_count,
img00_count,
img01_count,
img10_count,
img11_count,
img00_lock,
img01_lock,
img10_lock,
img11_lock,
last_chip,
last_copy,
next_chip,
next_copy,
) = struct.unpack(fmt_str, custom_data[:inc_len])
pw.dlog(f"Enabled: {enabled}")
pw.dlog(f"Max Count: {max_count}")
pw.dlog(f"Count 00: {img00_count}")
pw.dlog(f"Count 01: {img01_count}")
pw.dlog(f"Count 10: {img10_count}")
pw.dlog(f"Count 11: {img11_count}")
pw.dlog(f"Lock 00: {img00_lock}")
pw.dlog(f"Lock 01: {img01_lock}")
pw.dlog(f"Lock 10: {img10_lock}")
pw.dlog(f"Lock 11: {img11_lock}")
pw.dlog(f"Last Chip: {last_chip}")
pw.dlog(f"Last Copy: {last_copy}")
pw.dlog(f"Next Chip: {next_chip}")
pw.dlog(f"Next Copy: {next_copy}")
def handle_list_dir_dump_reply(pw: PrintWrapper, custom_data: bytes):
if len(custom_data) < 4:
_LOGGER.warning("Data unexpectedly small")
return
seq_idx = struct.unpack("!I", custom_data[0:4])[0]
total_chunks = struct.unpack("!I", custom_data[4:8])[0]
compressed = custom_data[8]
ls_cmd = custom_data[9:].split(b"\x00")[0].decode()
# Include length of NULL termination
file_data_offset = 9 + len(ls_cmd) + 1
pw.dlog(
f"Received directory listing dump for ls command {ls_cmd}. "
f"Chunk {seq_idx + 1}/{total_chunks}"
)
def remove_if_exists_and_new(seq_idx_: int, path_: Path):
if seq_idx_ == 0 and path_.exists():
os.remove(path_)
if compressed:
path = Path("dir_listing.txt.gz")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(
f"Compression option: {compressed}. Dumping file into dir_listing.txt.gz"
)
with open(path, "ab") as listing_file:
listing_file.write(custom_data[file_data_offset:])
else:
path = Path("dir_listing.txt")
remove_if_exists_and_new(seq_idx, path)
pw.dlog(f"Compression option: {compressed}. Dumping file into dir_listing.txt")
with open(path, "a") as listing_file:
listing_file_str = custom_data[file_data_offset:].decode()
listing_file.write(listing_file_str)
if seq_idx + 1 == total_chunks:
pw.dlog("Full directory listing: ")
with open("dir_listing.txt", "r") as listing_file:
print(listing_file.read())