eive-tmtc/eive_tmtc/tmtc/core.py

486 lines
17 KiB
Python

import enum
import logging
import struct
from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.tc.pus_20_fsfw_param import (
create_scalar_u8_parameter,
create_load_param_cmd,
)
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
_LOGGER = logging.getLogger(__name__)
class ActionId(enum.IntEnum):
LIST_DIR_INTO_FILE = 0
ANNOUNCE_VERSION = 1
ANNOUNCE_CURRENT_IMAGE = 2
ANNOUNCE_BOOT_COUNTS = 3
SWITCH_REBOOT_FILE_HANDLING = 5
RESET_REBOOT_COUNTER = 6
SWITCH_IMG_LOCK = 7
SET_MAX_REBOOT_CNT = 8
UPDATE_OBSW_FROM_SD_0 = 10
UPDATE_OBSW_FROM_SD_1 = 11
UPDATE_OBSW_FROM_TMP = 12
SWITCH_TO_SD_0 = 16
SWITCH_TO_SD_1 = 17
SWITCH_TO_BOTH_SD_CARDS = 18
XSC_REBOOT = 32
FULL_REBOOT = 34
EXECUTE_SHELL_CMD_BLOCKING = 40
EXECUTE_SHELL_CMD_NON_BLOCKING = 41
SYSTEMCTL_CMD_EXECUTOR = 42
class ParamId(enum.IntEnum):
PREF_SD = 0
class SetId(enum.IntEnum):
HK = 5
class OpCode:
ANNOUNCE_VERSION = "announce_version"
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
ANNOUNCE_BOOT_COUNTS = "announce_boot_counts"
EXECUTE_SHELL_CMD_BLOCKING = "exec_cmd_blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "exec_cmd_non_blocking"
SYSTEMCTL_CMD_EXECUTOR = "systemctl_cmd"
SET_PREF_SD = "set_pref_sd"
REBOOT_XSC = ["reboot_xsc"]
XSC_REBOOT_SELF = ["reboot_self"]
XSC_REBOOT_0_0 = ["reboot_00"]
XSC_REBOOT_0_1 = ["reboot_01"]
XSC_REBOOT_1_0 = ["reboot_10"]
XSC_REBOOT_1_1 = ["reboot_11"]
REBOOT_FULL = ["reboot_regular"]
GET_HK = ["get_hk"]
OBSW_UPDATE_FROM_SD_0 = ["obsw_update_sd0"]
OBSW_UPDATE_FROM_SD_1 = ["obsw_update_sd1"]
OBSW_UPDATE_FROM_TMP = ["obsw_update_tmp"]
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
ENABLE_REBOOT_FILE_HANDLING = ["rbh_off"]
DISABLE_REBOOT_FILE_HANDLING = ["rbh_on"]
RESET_ALL_REBOOT_COUNTERS = ["rbh_reset_a"]
RESET_REBOOT_COUNTER_00 = ["rbh_reset_00"]
RESET_REBOOT_COUNTER_01 = ["rbh_reset_01"]
RESET_REBOOT_COUNTER_10 = ["rbh_reset_10"]
RESET_REBOOT_COUNTER_11 = ["rbh_reset_11"]
SET_MAX_REBOOT_CNT = ["rbh_max_cnt"]
class Info:
ANNOUNCE_VERSION = "Announce version"
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
ANNOUNCE_BOOT_COUNTS = "Announce boot counts"
SYSTEMCTL_CMD_EXECUTOR = "Perform systemctl command"
EXECUTE_SHELL_CMD_BLOCKING = "Execute shell command blocking"
EXECUTE_SHELL_CMD_NON_BLOCKING = "Execute shell command non-blocking"
SET_PREF_SD = "Set preferred SD card"
REBOOT_XSC = "XSC reboot with prompt"
REBOOT_FULL = "Full regular reboot"
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
SWITCH_TO_SD_0 = "Switch to SD card 0"
SWITCH_TO_SD_1 = "Switch to SD card 1"
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
class Chip(enum.IntEnum):
CHIP_0 = 0
CHIP_1 = 1
NONE = 2
class Copy(enum.IntEnum):
COPY_0_NOM = 0
COPY_1_GOLD = 1
NONE = 2
class SystemctlCmd(enum.IntEnum):
START = 0
STOP = 1
RESTART = 2
@tmtc_definitions_provider
def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.ANNOUNCE_VERSION, info=Info.ANNOUNCE_VERSION)
oce.add(keys=OpCode.ANNOUNCE_CURRENT_IMAGE, info=Info.ANNOUNCE_CURRENT_IMAGE)
oce.add(keys=OpCode.ANNOUNCE_BOOT_COUNTS, info=Info.ANNOUNCE_BOOT_COUNTS)
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL)
oce.add(keys=OpCode.XSC_REBOOT_SELF, info="Reboot Self")
oce.add(keys=OpCode.XSC_REBOOT_0_0, info="Reboot 0 0")
oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1")
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
oce.add(keys=OpCode.SET_PREF_SD, info=Info.SET_PREF_SD)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add(keys=OpCode.SYSTEMCTL_CMD_EXECUTOR, info=Info.SYSTEMCTL_CMD_EXECUTOR)
oce.add(
keys=OpCode.EXECUTE_SHELL_CMD_BLOCKING, info=Info.EXECUTE_SHELL_CMD_BLOCKING
)
oce.add(
keys=OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING,
info=Info.EXECUTE_SHELL_CMD_NON_BLOCKING,
)
oce.add(
keys=OpCode.GET_HK,
info="Request housekeeping set",
)
oce.add(
keys=OpCode.ENABLE_REBOOT_FILE_HANDLING,
info="Enable reboot file handling",
)
oce.add(
keys=OpCode.DISABLE_REBOOT_FILE_HANDLING,
info="Disable reboot file handling",
)
oce.add(
keys=OpCode.RESET_ALL_REBOOT_COUNTERS,
info="Reset all reboot counters",
)
oce.add(
keys=OpCode.RESET_REBOOT_COUNTER_00,
info="Reset reboot counter 0 0",
)
oce.add(
keys=OpCode.RESET_REBOOT_COUNTER_01,
info="Reset reboot counter 0 1",
)
oce.add(
keys=OpCode.RESET_REBOOT_COUNTER_10,
info="Reset reboot counter 1 0",
)
oce.add(
keys=OpCode.RESET_REBOOT_COUNTER_11,
info="Reset reboot counter 1 1",
)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0)
oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1)
oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS)
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code == OpCode.ANNOUNCE_VERSION:
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))
elif op_code == OpCode.ANNOUNCE_CURRENT_IMAGE:
q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}")
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE)
)
elif op_code == OpCode.ANNOUNCE_BOOT_COUNTS:
q.add_log_cmd(f"{Info.ANNOUNCE_BOOT_COUNTS}")
q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_BOOT_COUNTS)
)
elif op_code in OpCode.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params()
add_xsc_reboot_cmd(
q=q,
reboot_self=reboot_self,
chip=chip_select,
copy=copy_select,
)
elif op_code in OpCode.REBOOT_FULL:
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
)
elif op_code in OpCode.XSC_REBOOT_SELF:
add_xsc_reboot_cmd(q=q, reboot_self=True)
elif op_code == OpCode.SYSTEMCTL_CMD_EXECUTOR:
print("systemctl command types: ")
for entry in SystemctlCmd:
print(f"{entry}: {entry.name}")
systemctl_cmd = SystemctlCmd(
int(input("Specify systemctl command type by key: "))
)
unit_name = input("Specify unit name: ")
cmd_data = bytearray([systemctl_cmd])
cmd_data.extend(unit_name.encode())
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SYSTEMCTL_CMD_EXECUTOR,
user_data=cmd_data,
)
)
elif op_code == OpCode.EXECUTE_SHELL_CMD_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_BLOCKING,
user_data=custom_cmd.encode(),
)
)
elif op_code == OpCode.EXECUTE_SHELL_CMD_NON_BLOCKING:
custom_cmd = input("Please specify command to execute: ")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.EXECUTE_SHELL_CMD_NON_BLOCKING,
user_data=custom_cmd.encode(),
)
)
elif op_code in OpCode.XSC_REBOOT_0_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
)
elif op_code in OpCode.XSC_REBOOT_0_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_0,
copy=Copy.COPY_1_GOLD,
)
elif op_code in OpCode.XSC_REBOOT_1_0:
add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
)
elif op_code in OpCode.XSC_REBOOT_1_1:
add_xsc_reboot_cmd(
q=q,
reboot_self=False,
chip=Chip.CHIP_1,
copy=Copy.COPY_1_GOLD,
)
elif op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Disabling reboot file handling")
user_data = bytearray([0])
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data,
)
)
elif op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Enabling reboot file handling")
user_data = bytearray([1])
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data,
)
)
elif op_code in OpCode.RESET_ALL_REBOOT_COUNTERS:
q.add_log_cmd("Resetting all reboot counters")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.RESET_REBOOT_COUNTER,
)
)
elif op_code in OpCode.RESET_REBOOT_COUNTER_00:
reset_specific_boot_counter(q, 0, 0)
elif op_code in OpCode.RESET_REBOOT_COUNTER_01:
reset_specific_boot_counter(q, 0, 1)
elif op_code in OpCode.RESET_REBOOT_COUNTER_10:
reset_specific_boot_counter(q, 1, 0)
elif op_code in OpCode.RESET_REBOOT_COUNTER_11:
reset_specific_boot_counter(q, 1, 1)
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
elif op_code in OpCode.OBSW_UPDATE_FROM_SD_1:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
elif op_code in OpCode.OBSW_UPDATE_FROM_TMP:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
elif op_code in OpCode.SWITCH_TO_SD_0:
q.add_log_cmd(Info.SWITCH_TO_SD_0)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
)
)
elif op_code in OpCode.SWITCH_TO_SD_1:
q.add_log_cmd(Info.SWITCH_TO_SD_1)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
)
)
elif op_code in OpCode.SWITCH_TO_BOTH_SD_CARDS:
while True:
active_sd_card = int(input("Please specify active SD card [0/1]: "))
if active_sd_card not in [0, 1]:
_LOGGER.warning("Invalid SD card specified. Try again")
break
q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS)
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS,
user_data=bytes([active_sd_card]),
)
)
elif op_code in OpCode.GET_HK:
q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
q.add_pus_tc(generate_one_hk_command(sid))
elif op_code in OpCode.SET_PREF_SD:
q.add_log_cmd("Set preferred SD card")
pref_sd = int(
input("Specify which SD card to set as the preferred one (0/1): ")
)
if pref_sd not in [0, 1]:
raise ValueError("Only 0 or 1 allowed for preferred SD card")
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u8_parameter(
object_id=CORE_CONTROLLER_ID,
domain_id=0,
unique_id=ParamId.PREF_SD,
parameter=pref_sd,
).pack()
)
)
else:
_LOGGER.warning(
f"Unknown operation code {op_code} for core controller commands"
)
def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
q.add_log_cmd(f"Resetting boot counter {chip} {copy}")
q.add_pus_tc(
create_action_cmd(
object_id=CORE_CONTROLLER_ID,
action_id=ActionId.RESET_REBOOT_COUNTER,
user_data=bytes([chip, copy]),
)
)
def create_full_reboot_cmds() -> PusTelecommand:
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
)
def determine_reboot_params() -> (bool, Chip, Copy):
chip_select = -1
copy_select = -1
reboot_self = input("Reboot self? [y/n]: ")
if reboot_self in ["y", "yes", "1"]:
_LOGGER.info("Rebooting currently running image")
return True, chip_select, copy_select
_LOGGER.info("Rebooting image specified by chip and copy")
return False, determine_chip_and_copy()
def determine_chip_and_copy() -> (int, int):
while True:
chip_select = input("Chip select [0/1]: ")
if chip_select in ["0", "1"]:
if chip_select == "0":
chip_select = Chip.CHIP_0
else:
chip_select = Chip.CHIP_1
break
else:
_LOGGER.warning("Invalid chip select value. Try again")
while True:
copy_select = input("Copy select [0/1]: ")
if copy_select in ["0", "1"]:
if copy_select == "0":
copy_select = Copy.COPY_0_NOM
else:
copy_select = Copy.COPY_1_GOLD
break
else:
_LOGGER.warning("Invalid copy select value. Try again")
return chip_select, copy_select
def pack_obsw_update_cmd(action_id: int) -> PusTelecommand:
chip, copy = determine_chip_and_copy()
user_data = bytearray([chip, copy])
custom_file_name = input("Use custom filename [y/n] ?: ")
if custom_file_name.lower() in ["y", "yes", "1"]:
custom_file_name = input("Specify custom filename: ")
user_data.extend(custom_file_name.encode())
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data
)
def add_xsc_reboot_cmd(
q: DefaultPusQueueHelper,
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
):
if reboot_self:
q.add_log_cmd("Packing reboot command for current image")
else:
q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}")
q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy))
def create_xsc_reboot_cmds(
reboot_self: bool,
chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE,
) -> PusTelecommand:
tc_data = bytearray()
if reboot_self:
tc_data.append(True)
else:
tc_data.append(False)
tc_data.append(chip)
tc_data.append(copy)
return create_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data
)
def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetId.HK:
pw = PrintWrapper(printer)
fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage}"
)
pw.dlog(printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3)