383 lines
13 KiB
Python
383 lines
13 KiB
Python
import enum
|
|
import logging
|
|
import struct
|
|
|
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
|
|
|
from eive_tmtc.config.definitions import CustomServiceList
|
|
from spacepackets.ecss import PusTelecommand
|
|
from tmtccmd.config import TmtcDefinitionWrapper
|
|
|
|
from tmtccmd.tc import DefaultPusQueueHelper
|
|
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
|
|
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
|
|
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
|
|
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
|
|
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class ActionId(enum.IntEnum):
|
|
LIST_DIR_INTO_FILE = 0
|
|
ANNOUNCE_VERSION = 1
|
|
ANNOUNCE_CURRENT_IMAGE = 2
|
|
SWITCH_REBOOT_FILE_HANDLING = 5
|
|
RESET_REBOOT_COUNTER = 6
|
|
SWITCH_IMG_LOCK = 7
|
|
SET_MAX_REBOOT_CNT = 8
|
|
UPDATE_OBSW_FROM_SD_0 = 10
|
|
UPDATE_OBSW_FROM_SD_1 = 11
|
|
UPDATE_OBSW_FROM_TMP = 12
|
|
SWITCH_TO_SD_0 = 16
|
|
SWITCH_TO_SD_1 = 17
|
|
SWITCH_TO_BOTH_SD_CARDS = 18
|
|
XSC_REBOOT = 32
|
|
FULL_REBOOT = 34
|
|
|
|
|
|
class SetId(enum.IntEnum):
|
|
HK = 5
|
|
|
|
|
|
class OpCode:
|
|
ANNOUNCE_VERSION = "announce_version"
|
|
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
|
|
REBOOT_XSC = ["0", "reboot_xsc"]
|
|
XSC_REBOOT_SELF = ["1", "reboot_self"]
|
|
XSC_REBOOT_0_0 = ["2", "reboot_00"]
|
|
XSC_REBOOT_0_1 = ["3", "reboot_01"]
|
|
XSC_REBOOT_1_0 = ["4", "reboot_10"]
|
|
XSC_REBOOT_1_1 = ["5", "reboot_11"]
|
|
REBOOT_FULL = ["6", "reboot_regular"]
|
|
GET_HK = ["7", "get_hk"]
|
|
OBSW_UPDATE_FROM_SD_0 = ["obsw_update_sd0"]
|
|
OBSW_UPDATE_FROM_SD_1 = ["obsw_update_sd1"]
|
|
OBSW_UPDATE_FROM_TMP = ["obsw_update_tmp"]
|
|
SWITCH_TO_SD_0 = ["switch_to_sd_0"]
|
|
SWITCH_TO_SD_1 = ["switch_to_sd_1"]
|
|
SWITCH_TO_BOTH_SD_CARDS = ["switch_to_both_sd_cards"]
|
|
ENABLE_REBOOT_FILE_HANDLING = ["32", "rbh_off"]
|
|
DISABLE_REBOOT_FILE_HANDLING = ["33", "rbh_on"]
|
|
RESET_ALL_REBOOT_COUNTERS = ["34", "rbh_reset_a"]
|
|
RESET_REBOOT_COUNTER_00 = ["35", "rbh_reset_00"]
|
|
RESET_REBOOT_COUNTER_01 = ["36", "rbh_reset_01"]
|
|
RESET_REBOOT_COUNTER_10 = ["37", "rbh_reset_10"]
|
|
RESET_REBOOT_COUNTER_11 = ["38", "rbh_reset_11"]
|
|
SET_MAX_REBOOT_CNT = ["39", "rbh_max_cnt"]
|
|
|
|
|
|
class Info:
|
|
ANNOUNCE_VERSION = "Announce version"
|
|
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
|
|
REBOOT_XSC = "XSC reboot with prompt"
|
|
REBOOT_FULL = "Full regular reboot"
|
|
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
|
|
OBSW_UPDATE_FROM_SD_1 = "Update OBSW from SD Card 1"
|
|
OBSW_UPDATE_FROM_TMP = "Update OBSW from tmp folder"
|
|
SWITCH_TO_SD_0 = "Switch to SD card 0"
|
|
SWITCH_TO_SD_1 = "Switch to SD card 1"
|
|
SWITCH_TO_BOTH_SD_CARDS = "Switch to both SD cards with specified active card"
|
|
|
|
|
|
class Chip(enum.IntEnum):
|
|
CHIP_0 = 0
|
|
CHIP_1 = 1
|
|
NONE = 2
|
|
|
|
|
|
class Copy(enum.IntEnum):
|
|
COPY_0_NOM = 0
|
|
COPY_1_GOLD = 1
|
|
NONE = 2
|
|
|
|
|
|
@tmtc_definitions_provider
|
|
def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
|
oce = OpCodeEntry()
|
|
oce.add(keys=OpCode.ANNOUNCE_VERSION, info=Info.ANNOUNCE_VERSION)
|
|
oce.add(keys=OpCode.ANNOUNCE_CURRENT_IMAGE, info=Info.ANNOUNCE_CURRENT_IMAGE)
|
|
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
|
|
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
|
|
oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL)
|
|
oce.add(keys=OpCode.XSC_REBOOT_SELF, info="Reboot Self")
|
|
oce.add(keys=OpCode.XSC_REBOOT_0_0, info="Reboot 0 0")
|
|
oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1")
|
|
oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
|
|
oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
|
|
oce.add(
|
|
keys=OpCode.GET_HK,
|
|
info="Request housekeeping set",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.ENABLE_REBOOT_FILE_HANDLING,
|
|
info="Enable reboot file handling",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.DISABLE_REBOOT_FILE_HANDLING,
|
|
info="Disable reboot file handling",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.RESET_ALL_REBOOT_COUNTERS,
|
|
info="Reset all reboot counters",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.RESET_REBOOT_COUNTER_00,
|
|
info="Reset reboot counter 0 0",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.RESET_REBOOT_COUNTER_01,
|
|
info="Reset reboot counter 0 1",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.RESET_REBOOT_COUNTER_10,
|
|
info="Reset reboot counter 1 0",
|
|
)
|
|
oce.add(
|
|
keys=OpCode.RESET_REBOOT_COUNTER_11,
|
|
info="Reset reboot counter 1 1",
|
|
)
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
|
|
oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
|
|
oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0)
|
|
oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1)
|
|
oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS)
|
|
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
|
|
|
|
|
|
def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
|
|
if op_code == OpCode.ANNOUNCE_VERSION:
|
|
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
|
|
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))
|
|
if op_code == OpCode.ANNOUNCE_CURRENT_IMAGE:
|
|
q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE)
|
|
)
|
|
if op_code in OpCode.REBOOT_XSC:
|
|
reboot_self, chip_select, copy_select = determine_reboot_params()
|
|
add_xsc_reboot_cmd(
|
|
q=q,
|
|
reboot_self=reboot_self,
|
|
chip=chip_select,
|
|
copy=copy_select,
|
|
)
|
|
if op_code in OpCode.REBOOT_FULL:
|
|
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
|
|
)
|
|
)
|
|
if op_code in OpCode.XSC_REBOOT_SELF:
|
|
add_xsc_reboot_cmd(q=q, reboot_self=True)
|
|
if op_code in OpCode.XSC_REBOOT_0_0:
|
|
add_xsc_reboot_cmd(
|
|
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
|
|
)
|
|
if op_code in OpCode.XSC_REBOOT_0_1:
|
|
add_xsc_reboot_cmd(
|
|
q=q,
|
|
reboot_self=False,
|
|
chip=Chip.CHIP_0,
|
|
copy=Copy.COPY_1_GOLD,
|
|
)
|
|
if op_code in OpCode.XSC_REBOOT_1_0:
|
|
add_xsc_reboot_cmd(
|
|
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
|
|
)
|
|
if op_code in OpCode.XSC_REBOOT_1_1:
|
|
add_xsc_reboot_cmd(
|
|
q=q,
|
|
reboot_self=False,
|
|
chip=Chip.CHIP_1,
|
|
copy=Copy.COPY_1_GOLD,
|
|
)
|
|
if op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING:
|
|
q.add_log_cmd("Disabling reboot file handling")
|
|
user_data = bytearray([0])
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
|
|
user_data=user_data,
|
|
)
|
|
)
|
|
if op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING:
|
|
q.add_log_cmd("Enabling reboot file handling")
|
|
user_data = bytearray([1])
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
|
|
user_data=user_data,
|
|
)
|
|
)
|
|
if op_code in OpCode.RESET_ALL_REBOOT_COUNTERS:
|
|
q.add_log_cmd("Resetting all reboot counters")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.RESET_REBOOT_COUNTER,
|
|
)
|
|
)
|
|
if op_code in OpCode.RESET_REBOOT_COUNTER_00:
|
|
reset_specific_boot_counter(q, 0, 0)
|
|
if op_code in OpCode.RESET_REBOOT_COUNTER_01:
|
|
reset_specific_boot_counter(q, 0, 1)
|
|
if op_code in OpCode.RESET_REBOOT_COUNTER_10:
|
|
reset_specific_boot_counter(q, 1, 0)
|
|
if op_code in OpCode.RESET_REBOOT_COUNTER_11:
|
|
reset_specific_boot_counter(q, 1, 1)
|
|
if op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
|
|
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
|
|
if op_code in OpCode.OBSW_UPDATE_FROM_SD_1:
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
|
|
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
|
|
if op_code in OpCode.OBSW_UPDATE_FROM_TMP:
|
|
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
|
|
q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
|
|
if op_code in OpCode.SWITCH_TO_SD_0:
|
|
q.add_log_cmd(Info.SWITCH_TO_SD_0)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
|
|
)
|
|
)
|
|
if op_code in OpCode.SWITCH_TO_SD_1:
|
|
q.add_log_cmd(Info.SWITCH_TO_SD_1)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
|
|
)
|
|
)
|
|
if op_code in OpCode.SWITCH_TO_BOTH_SD_CARDS:
|
|
while True:
|
|
active_sd_card = int(input("Please specify active SD cqrd [0/1]: "))
|
|
if active_sd_card not in [0, 1]:
|
|
_LOGGER.warning("Invalid SD card specified. Try again")
|
|
break
|
|
q.add_log_cmd(Info.SWITCH_TO_BOTH_SD_CARDS)
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS,
|
|
user_data=bytes([active_sd_card]),
|
|
)
|
|
)
|
|
if op_code in OpCode.GET_HK:
|
|
q.add_log_cmd("Requesting housekeeping set")
|
|
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
|
|
q.add_pus_tc(generate_one_hk_command(sid))
|
|
|
|
|
|
def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
|
|
q.add_log_cmd(f"Resetting boot counter {chip} {copy}")
|
|
q.add_pus_tc(
|
|
create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID,
|
|
action_id=ActionId.RESET_REBOOT_COUNTER,
|
|
user_data=bytes([chip, copy]),
|
|
)
|
|
)
|
|
|
|
|
|
def create_full_reboot_cmds() -> PusTelecommand:
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
|
|
)
|
|
|
|
|
|
def determine_reboot_params() -> (bool, Chip, Copy):
|
|
chip_select = -1
|
|
copy_select = -1
|
|
reboot_self = input("Reboot self? [y/n]: ")
|
|
if reboot_self in ["y", "yes", "1"]:
|
|
_LOGGER.info("Rebooting currently running image")
|
|
return True, chip_select, copy_select
|
|
_LOGGER.info("Rebooting image specified by chip and copy")
|
|
return False, determine_chip_and_copy()
|
|
|
|
|
|
def determine_chip_and_copy() -> (int, int):
|
|
while True:
|
|
chip_select = input("Chip select [0/1]: ")
|
|
if chip_select in ["0", "1"]:
|
|
if chip_select == "0":
|
|
chip_select = Chip.CHIP_0
|
|
else:
|
|
chip_select = Chip.CHIP_1
|
|
break
|
|
else:
|
|
_LOGGER.warning("Invalid chip select value. Try again")
|
|
while True:
|
|
copy_select = input("Copy select [0/1]: ")
|
|
if copy_select in ["0", "1"]:
|
|
if copy_select == "0":
|
|
copy_select = Copy.COPY_0_NOM
|
|
else:
|
|
copy_select = Copy.COPY_1_GOLD
|
|
break
|
|
else:
|
|
_LOGGER.warning("Invalid copy select value. Try again")
|
|
return chip_select, copy_select
|
|
|
|
|
|
def pack_obsw_update_cmd(action_id: int) -> PusTelecommand:
|
|
chip, copy = determine_chip_and_copy()
|
|
user_data = bytes([chip, copy])
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=action_id, user_data=user_data
|
|
)
|
|
|
|
|
|
def add_xsc_reboot_cmd(
|
|
q: DefaultPusQueueHelper,
|
|
reboot_self: bool,
|
|
chip: Chip = Chip.NONE,
|
|
copy: Copy = Copy.NONE,
|
|
):
|
|
if reboot_self:
|
|
q.add_log_cmd("Packing reboot command for current image")
|
|
else:
|
|
q.add_log_cmd(f"Packing reboot command for chip {chip} and copy {copy}")
|
|
q.add_pus_tc(create_xsc_reboot_cmds(reboot_self, chip, copy))
|
|
|
|
|
|
def create_xsc_reboot_cmds(
|
|
reboot_self: bool,
|
|
chip: Chip = Chip.NONE,
|
|
copy: Copy = Copy.NONE,
|
|
) -> PusTelecommand:
|
|
tc_data = bytearray()
|
|
if reboot_self:
|
|
tc_data.append(True)
|
|
else:
|
|
tc_data.append(False)
|
|
tc_data.append(chip)
|
|
tc_data.append(copy)
|
|
return create_action_cmd(
|
|
object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data
|
|
)
|
|
|
|
|
|
def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
|
if set_id == SetId.HK:
|
|
pw = PrintWrapper(printer)
|
|
fmt_str = "!fff"
|
|
inc_len = struct.calcsize(fmt_str)
|
|
(temperature, ps_voltage, pl_voltage) = struct.unpack(
|
|
fmt_str, hk_data[0 : 0 + inc_len]
|
|
)
|
|
printout = (
|
|
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
|
|
f"PL Voltage [mV] {pl_voltage}"
|
|
)
|
|
pw.dlog(printout)
|
|
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3)
|