207 lines
6.2 KiB
Python
207 lines
6.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Command sequence to test the HeaterHandler
|
|
@author J. Meier
|
|
@date 30.01.2021
|
|
"""
|
|
import enum
|
|
|
|
from eive_tmtc.config.definitions import CustomServiceList
|
|
from eive_tmtc.config.object_ids import get_object_ids
|
|
from eive_tmtc.tmtc.tcs.defs import Heater
|
|
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
|
|
from tmtccmd.config.tmtc import tmtc_definitions_provider
|
|
from tmtccmd.tmtc import DefaultPusQueueHelper
|
|
from tmtccmd.util.obj_id import ObjectIdU32
|
|
from tmtccmd.pus.s201_fsfw_health import (
|
|
pack_set_health_cmd_data,
|
|
FsfwHealth,
|
|
Subservice,
|
|
)
|
|
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
|
from spacepackets.ecss.tc import PusTelecommand
|
|
|
|
|
|
HEATER_LOCATION = [
|
|
"PLOC Processing Board",
|
|
"PCDU PDU",
|
|
"ACS Board",
|
|
"OBC Board",
|
|
"Camera",
|
|
"Startracker",
|
|
"DRO",
|
|
"Syrlinks",
|
|
]
|
|
|
|
|
|
class OpCode:
|
|
HEATER_CMD = ["switch_cmd"]
|
|
HEATER_EXT_CTRL = ["set_ext_ctrl"]
|
|
HEATER_FAULTY_CMD = ["set_faulty"]
|
|
HEATER_HEALTHY_CMD = ["set_healthy"]
|
|
|
|
|
|
class Info:
|
|
HEATER_CMD = "Heater Switch Command"
|
|
HEATER_EXT_CTRL = "Set to external control"
|
|
HEATER_FAULTY_CMD = "Set to faulty"
|
|
HEATER_HEALTHY_CMD = "Set to healthy"
|
|
|
|
|
|
# Needed in OBSW to differentiate between external and internal heater commands
|
|
COMMAND_SOURCE_PARAM_EXTERNAL = 1
|
|
|
|
|
|
class ActionIds(enum.IntEnum):
|
|
SWITCH_HEATER = 0
|
|
|
|
|
|
@tmtc_definitions_provider
|
|
def add_heater_cmds(defs: TmtcDefinitionWrapper):
|
|
oce = OpCodeEntry()
|
|
oce.add(keys=OpCode.HEATER_CMD, info=Info.HEATER_CMD)
|
|
oce.add(keys=OpCode.HEATER_HEALTHY_CMD, info=Info.HEATER_HEALTHY_CMD)
|
|
oce.add(keys=OpCode.HEATER_EXT_CTRL, info=Info.HEATER_EXT_CTRL)
|
|
oce.add(keys=OpCode.HEATER_FAULTY_CMD, info=Info.HEATER_FAULTY_CMD)
|
|
defs.add_service(
|
|
name=CustomServiceList.HEATER.value,
|
|
info="Heater Device",
|
|
op_code_entry=oce,
|
|
)
|
|
|
|
|
|
def pack_heater_cmds(object_id: bytes, cmd_str: str, q: DefaultPusQueueHelper):
|
|
if cmd_str in OpCode.HEATER_CMD:
|
|
q.add_log_cmd("Heater Switching")
|
|
heater_number = prompt_heater()
|
|
while True:
|
|
action = input("Turn switch on or off? (0 - off, 1 - on): ")
|
|
if not action.isdigit():
|
|
print("Switch action not valid")
|
|
continue
|
|
action = int(action)
|
|
if action != 0 and action != 1:
|
|
print("Invalid action defined. Must be 0 (off) or 1 (on")
|
|
continue
|
|
break
|
|
if action == 1:
|
|
act_str = "on"
|
|
else:
|
|
act_str = "off"
|
|
debug_string = f"Switching heater {heater_number} {act_str}"
|
|
q.add_log_cmd(debug_string)
|
|
q.add_pus_tc(pack_switch_heater_command(object_id, heater_number, action))
|
|
if cmd_str in OpCode.HEATER_EXT_CTRL:
|
|
heater_number = prompt_heater()
|
|
obj_id = heater_idx_to_obj(heater_number)
|
|
health_cmd(
|
|
q=q,
|
|
object_id=obj_id,
|
|
health=FsfwHealth.EXTERNAL_CTRL,
|
|
health_str="External Control",
|
|
heater_idx=heater_number,
|
|
)
|
|
if cmd_str in OpCode.HEATER_FAULTY_CMD:
|
|
heater_number = prompt_heater()
|
|
obj_id = heater_idx_to_obj(heater_number)
|
|
health_cmd(
|
|
q=q,
|
|
object_id=obj_id,
|
|
health=FsfwHealth.FAULTY,
|
|
health_str="Faulty",
|
|
heater_idx=heater_number,
|
|
)
|
|
if cmd_str in OpCode.HEATER_HEALTHY_CMD:
|
|
heater_number = prompt_heater()
|
|
obj_id = heater_idx_to_obj(heater_number)
|
|
health_cmd(
|
|
q=q,
|
|
object_id=obj_id,
|
|
health=FsfwHealth.HEALTHY,
|
|
health_str="Healthy",
|
|
heater_idx=heater_number,
|
|
)
|
|
|
|
|
|
def heater_idx_to_obj(heater: int) -> ObjectIdU32:
|
|
from eive_tmtc.config.object_ids import (
|
|
HEATER_0_PLOC_PROC_BRD,
|
|
HEATER_1_PCDU_BRD,
|
|
HEATER_2_ACS_BRD,
|
|
HEATER_3_OBC_BRD,
|
|
HEATER_4_CAMERA,
|
|
HEATER_5_STR,
|
|
HEATER_6_DRO,
|
|
HEATER_7_SYRLINKS,
|
|
)
|
|
|
|
obj_id_array = [
|
|
HEATER_0_PLOC_PROC_BRD,
|
|
HEATER_1_PCDU_BRD,
|
|
HEATER_2_ACS_BRD,
|
|
HEATER_3_OBC_BRD,
|
|
HEATER_4_CAMERA,
|
|
HEATER_5_STR,
|
|
HEATER_6_DRO,
|
|
HEATER_7_SYRLINKS,
|
|
]
|
|
obj_dict = get_object_ids()
|
|
obj_id_obj = obj_dict.get(obj_id_array[heater])
|
|
if obj_id_obj is None:
|
|
return ObjectIdU32.from_bytes(obj_id_array[heater])
|
|
return obj_id_obj
|
|
|
|
|
|
def prompt_heater() -> int:
|
|
while True:
|
|
print("HEATER 0 | PLOC PROC Board")
|
|
print("HEATER 1 | PCDU Board")
|
|
print("HEATER 2 | ACS Board")
|
|
print("HEATER 3 | OBC Board")
|
|
print("HEATER 4 | CAMERA")
|
|
print("HEATER 5 | STR")
|
|
print("HEATER 6 | DRO")
|
|
print("HEATER 7 | Syrlinks")
|
|
heater_number = input("Type number of heater to switch [0-7]: ")
|
|
if not heater_number.isdigit():
|
|
print("Heater number not a digit")
|
|
continue
|
|
heater_number = int(heater_number)
|
|
if heater_number >= Heater.NUMBER_OF_SWITCHES or heater_number < 0:
|
|
print("Invalid heater switch number")
|
|
continue
|
|
break
|
|
return heater_number
|
|
|
|
|
|
def health_cmd(
|
|
q: DefaultPusQueueHelper,
|
|
heater_idx: int,
|
|
object_id: ObjectIdU32,
|
|
health: FsfwHealth,
|
|
health_str: str,
|
|
):
|
|
q.add_log_cmd(f"Setting Heater {heater_idx} {object_id} to {health_str}")
|
|
app_data = pack_set_health_cmd_data(object_id=object_id.as_bytes, health=health)
|
|
q.add_pus_tc(
|
|
PusTelecommand(
|
|
service=201, subservice=Subservice.TC_SET_HEALTH, app_data=app_data
|
|
)
|
|
)
|
|
|
|
|
|
def pack_switch_heater_command(
|
|
object_id: bytes, switch_nr: int, switch_action: int
|
|
) -> PusTelecommand:
|
|
"""Function to generate a heater switch command.
|
|
:param object_id: The object id of the HeaterHandler object.
|
|
:param switch_nr: The switch number identifying the heater to switch
|
|
:param switch_action: Action to perform. 0 - Sets switch off, 1 - Sets switch on.
|
|
"""
|
|
command = bytearray()
|
|
command.append(switch_nr)
|
|
command.append(switch_action)
|
|
command.append(COMMAND_SOURCE_PARAM_EXTERNAL)
|
|
return create_action_cmd(
|
|
object_id=object_id, action_id=ActionIds.SWITCH_HEATER, user_data=command
|
|
)
|