TCS CTRL Info Set #215
@ -65,7 +65,8 @@ class CustomServiceList(str, enum.Enum):
|
|||||||
PL_SS = "pl_subsystem"
|
PL_SS = "pl_subsystem"
|
||||||
ACS_BRD_ASS = "acs_brd_ass"
|
ACS_BRD_ASS = "acs_brd_ass"
|
||||||
SUS_BRD_ASS = "sus_brd_ass"
|
SUS_BRD_ASS = "sus_brd_ass"
|
||||||
TCS = "tcs"
|
TCS_SS = "tcs_subsystem"
|
||||||
|
TCS_CTRL = "tcs_ctrl"
|
||||||
TCS_ASS = "tcs_ass"
|
TCS_ASS = "tcs_ass"
|
||||||
TIME = "time"
|
TIME = "time"
|
||||||
PROCEDURE = "proc"
|
PROCEDURE = "proc"
|
||||||
|
@ -6,6 +6,7 @@ from typing import cast
|
|||||||
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
|
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
|
||||||
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
|
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
|
||||||
from eive_tmtc.tmtc.power.power import pack_power_commands
|
from eive_tmtc.tmtc.power.power import pack_power_commands
|
||||||
|
from eive_tmtc.tmtc.tcs.ctrl import pack_tcs_ctrl_commands
|
||||||
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
|
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
|
||||||
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
|
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
|
||||||
from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands
|
from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands
|
||||||
@ -99,8 +100,10 @@ def handle_default_procedure( # noqa C901: Complexity okay here.
|
|||||||
if service == CustomServiceList.ACU.value:
|
if service == CustomServiceList.ACU.value:
|
||||||
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
|
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
|
||||||
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
|
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
|
||||||
if service == CustomServiceList.TCS.value:
|
if service == CustomServiceList.TCS_SS.value:
|
||||||
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
|
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
|
||||||
|
if service == CustomServiceList.TCS_CTRL.value:
|
||||||
|
return pack_tcs_ctrl_commands(q=queue_helper, op_code=op_code)
|
||||||
if service == CustomServiceList.TMP1075.value:
|
if service == CustomServiceList.TMP1075.value:
|
||||||
menu_dict = {
|
menu_dict = {
|
||||||
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),
|
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),
|
||||||
|
@ -48,10 +48,7 @@ FORWARD_SENSOR_TEMPS = False
|
|||||||
|
|
||||||
|
|
||||||
def handle_hk_packet(
|
def handle_hk_packet(
|
||||||
raw_tm: bytes,
|
raw_tm: bytes, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, hk_level: int
|
||||||
obj_id_dict: ObjectIdDictT,
|
|
||||||
printer: FsfwTmTcPrinter,
|
|
||||||
hk_level: int
|
|
||||||
):
|
):
|
||||||
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
|
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
|
||||||
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
|
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
|
||||||
|
@ -29,7 +29,7 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
|
|||||||
verif_wrapper: VerificationWrapper,
|
verif_wrapper: VerificationWrapper,
|
||||||
printer: FsfwTmTcPrinter,
|
printer: FsfwTmTcPrinter,
|
||||||
raw_logger: RawTmtcTimedLogWrapper,
|
raw_logger: RawTmtcTimedLogWrapper,
|
||||||
hk_level: int
|
hk_level: int,
|
||||||
):
|
):
|
||||||
if len(packet) < 8:
|
if len(packet) < 8:
|
||||||
_LOGGER.warning("Detected packet shorter than 8 bytes!")
|
_LOGGER.warning("Detected packet shorter than 8 bytes!")
|
||||||
@ -49,7 +49,9 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
|
|||||||
if service == 1:
|
if service == 1:
|
||||||
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
|
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
|
||||||
elif service == 3:
|
elif service == 3:
|
||||||
handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level)
|
handle_hk_packet(
|
||||||
|
printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level
|
||||||
|
)
|
||||||
elif service == 5:
|
elif service == 5:
|
||||||
handle_event_packet(raw_tm=packet, pw=pw)
|
handle_event_packet(raw_tm=packet, pw=pw)
|
||||||
elif service == 8:
|
elif service == 8:
|
||||||
|
92
eive_tmtc/tmtc/tcs/ctrl.py
Normal file
92
eive_tmtc/tmtc/tcs/ctrl.py
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
from eive_tmtc.config.definitions import CustomServiceList
|
||||||
|
from eive_tmtc.config.object_ids import TCS_CONTROLLER
|
||||||
|
from eive_tmtc.tmtc.tcs import CtrlSetId
|
||||||
|
from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds
|
||||||
|
from tmtccmd.config.tmtc import (
|
||||||
|
tmtc_definitions_provider,
|
||||||
|
TmtcDefinitionWrapper,
|
||||||
|
OpCodeEntry,
|
||||||
|
)
|
||||||
|
from tmtccmd.tc import DefaultPusQueueHelper
|
||||||
|
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||||
|
make_sid,
|
||||||
|
generate_one_hk_command,
|
||||||
|
create_request_one_diag_command,
|
||||||
|
create_enable_periodic_hk_command_with_interval,
|
||||||
|
create_request_one_hk_command,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class OpCode:
|
||||||
|
REQUEST_PRIMARY_TEMP_SET = "temp"
|
||||||
|
ENABLE_TEMP_SET = "enable_temp_set"
|
||||||
|
REQUEST_DEVICE_TEMP_SET = "temp_devs"
|
||||||
|
REQUEST_DEVICE_SUS_SET = "temp_sus"
|
||||||
|
REQUEST_HEATER_INFO = "heater_info"
|
||||||
|
REQUEST_TCS_CTRL_INFO = "tcs_ctrl_info"
|
||||||
|
|
||||||
|
|
||||||
|
class Info:
|
||||||
|
ENABLE_TEMP_SET = "Enable Primary Temperature Set"
|
||||||
|
REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures"
|
||||||
|
REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures"
|
||||||
|
REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures"
|
||||||
|
REQUEST_HEATER_INFO = "Request heater information"
|
||||||
|
REQUEST_TCS_CTRL_INFO = "Request TCS controller information"
|
||||||
|
|
||||||
|
|
||||||
|
def pack_tcs_ctrl_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||||
|
if op_code == OpCode.REQUEST_PRIMARY_TEMP_SET:
|
||||||
|
sensor_set_sid = make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS)
|
||||||
|
q.add_log_cmd(Info.REQUEST_PRIMARY_TEMP_SET)
|
||||||
|
q.add_pus_tc(generate_one_hk_command(sensor_set_sid))
|
||||||
|
if op_code == OpCode.REQUEST_DEVICE_TEMP_SET:
|
||||||
|
q.add_log_cmd(Info.REQUEST_DEVICE_TEMP_SET)
|
||||||
|
q.add_pus_tc(
|
||||||
|
generate_one_hk_command(make_sid(TCS_CONTROLLER, CtrlSetId.DEVICE_SENSORS))
|
||||||
|
)
|
||||||
|
if op_code == OpCode.REQUEST_DEVICE_SUS_SET:
|
||||||
|
q.add_log_cmd(Info.REQUEST_DEVICE_SUS_SET)
|
||||||
|
q.add_pus_tc(
|
||||||
|
generate_one_hk_command(
|
||||||
|
make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if op_code == OpCode.REQUEST_HEATER_INFO:
|
||||||
|
q.add_log_cmd(Info.REQUEST_HEATER_INFO)
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_request_one_diag_command(
|
||||||
|
make_sid(TCS_CONTROLLER, CtrlSetId.HEATER_INFO)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if op_code == OpCode.REQUEST_TCS_CTRL_INFO:
|
||||||
|
q.add_log_cmd(Info.REQUEST_TCS_CTRL_INFO)
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_request_one_hk_command(
|
||||||
|
make_sid(TCS_CONTROLLER, CtrlSetId.TCS_CTRL_INFO)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if op_code == OpCode.ENABLE_TEMP_SET:
|
||||||
|
interval_seconds = float(input("Please specify interval in seconds: "))
|
||||||
|
cmds = create_enable_periodic_hk_command_with_interval(
|
||||||
|
False, make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS), interval_seconds
|
||||||
|
)
|
||||||
|
for cmd in cmds:
|
||||||
|
q.add_pus_tc(cmd)
|
||||||
|
pack_tcs_ass_cmds(q, op_code)
|
||||||
|
|
||||||
|
|
||||||
|
@tmtc_definitions_provider
|
||||||
|
def add_tcs_ctrl_cmds(defs: TmtcDefinitionWrapper):
|
||||||
|
oce = OpCodeEntry()
|
||||||
|
oce.add(keys=OpCode.ENABLE_TEMP_SET, info=Info.ENABLE_TEMP_SET)
|
||||||
|
oce.add(keys=OpCode.REQUEST_PRIMARY_TEMP_SET, info=Info.REQUEST_PRIMARY_TEMP_SET)
|
||||||
|
oce.add(keys=OpCode.REQUEST_DEVICE_TEMP_SET, info=Info.REQUEST_DEVICE_TEMP_SET)
|
||||||
|
oce.add(keys=OpCode.REQUEST_DEVICE_SUS_SET, info=Info.REQUEST_DEVICE_SUS_SET)
|
||||||
|
oce.add(keys=OpCode.REQUEST_HEATER_INFO, info=Info.REQUEST_HEATER_INFO)
|
||||||
|
oce.add(keys=OpCode.REQUEST_TCS_CTRL_INFO, info=Info.REQUEST_TCS_CTRL_INFO)
|
||||||
|
defs.add_service(
|
||||||
|
name=CustomServiceList.TCS_CTRL,
|
||||||
|
info="TCS controller",
|
||||||
|
op_code_entry=oce,
|
||||||
|
)
|
@ -12,3 +12,16 @@ class CtrlSetId(enum.IntEnum):
|
|||||||
class TcsSubmode(enum.IntEnum):
|
class TcsSubmode(enum.IntEnum):
|
||||||
DEFAULT = 0
|
DEFAULT = 0
|
||||||
NO_HEATER_CTRL = 1
|
NO_HEATER_CTRL = 1
|
||||||
|
|
||||||
|
|
||||||
|
class Heater(enum.IntEnum):
|
||||||
|
HEATER_0_PLOC_PROC_BRD = 0
|
||||||
|
HEATER_1_PCDU_BRD = 1
|
||||||
|
HEATER_2_ACS_BRD = 2
|
||||||
|
HEATER_3_OBC_BRD = 3
|
||||||
|
HEATER_4_CAMERA = 4
|
||||||
|
HEATER_5_STR = 5
|
||||||
|
HEATER_6_DRO = 6
|
||||||
|
HEATER_7_SYRLINKS = 7
|
||||||
|
NUMBER_OF_SWITCHES = 8
|
||||||
|
NONE = 0xFF
|
||||||
|
@ -7,6 +7,7 @@ import enum
|
|||||||
|
|
||||||
from eive_tmtc.config.definitions import CustomServiceList
|
from eive_tmtc.config.definitions import CustomServiceList
|
||||||
from eive_tmtc.config.object_ids import get_object_ids
|
from eive_tmtc.config.object_ids import get_object_ids
|
||||||
|
from eive_tmtc.tmtc.tcs.defs import Heater
|
||||||
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
|
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
|
||||||
from tmtccmd.config.tmtc import tmtc_definitions_provider
|
from tmtccmd.config.tmtc import tmtc_definitions_provider
|
||||||
from tmtccmd.tc import DefaultPusQueueHelper
|
from tmtccmd.tc import DefaultPusQueueHelper
|
||||||
@ -20,18 +21,6 @@ from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
|
|||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
|
|
||||||
class Heater(enum.IntEnum):
|
|
||||||
HEATER_0_PLOC_PROC_BRD = 0
|
|
||||||
HEATER_1_PCDU_BRD = 1
|
|
||||||
HEATER_2_ACS_BRD = 2
|
|
||||||
HEATER_3_OBC_BRD = 3
|
|
||||||
HEATER_4_CAMERA = 4
|
|
||||||
HEATER_5_STR = 5
|
|
||||||
HEATER_6_DRO = 6
|
|
||||||
HEATER_7_SYRLINKS = 7
|
|
||||||
NUMBER_OF_SWITCHES = 8
|
|
||||||
|
|
||||||
|
|
||||||
HEATER_LOCATION = [
|
HEATER_LOCATION = [
|
||||||
"PLOC Processing Board",
|
"PLOC Processing Board",
|
||||||
"PCDU PDU",
|
"PCDU PDU",
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from .defs import CtrlSetId
|
|
||||||
from eive_tmtc.config.definitions import CustomServiceList
|
from eive_tmtc.config.definitions import CustomServiceList
|
||||||
from eive_tmtc.config.object_ids import TCS_CONTROLLER, TCS_SUBSYSTEM_ID
|
from eive_tmtc.config.object_ids import TCS_SUBSYSTEM_ID
|
||||||
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
|
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
|
||||||
from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds
|
from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds
|
||||||
from tmtccmd.config.tmtc import (
|
from tmtccmd.config.tmtc import (
|
||||||
@ -10,74 +9,28 @@ from tmtccmd.config.tmtc import (
|
|||||||
)
|
)
|
||||||
from tmtccmd.tc import DefaultPusQueueHelper
|
from tmtccmd.tc import DefaultPusQueueHelper
|
||||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode, create_announce_mode_recursive_command
|
from tmtccmd.tc.pus_200_fsfw_mode import Mode, create_announce_mode_recursive_command
|
||||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
|
||||||
make_sid,
|
|
||||||
generate_one_hk_command,
|
|
||||||
create_enable_periodic_hk_command_with_interval,
|
|
||||||
create_request_one_diag_command,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class OpCodeSys:
|
class OpCode:
|
||||||
OFF = ["off"]
|
OFF = "off"
|
||||||
NML = ["nml"]
|
NML = "nml"
|
||||||
REQUEST_PRIMARY_TEMP_SET = ["temp"]
|
|
||||||
ENABLE_TEMP_SET = "enable_temp_set"
|
|
||||||
REQUEST_DEVICE_TEMP_SET = ["temp_devs"]
|
|
||||||
REQUEST_DEVICE_SUS_SET = ["temp_sus"]
|
|
||||||
REQUEST_HEATER_INFO = "heater_info"
|
|
||||||
ANNOUNCE_MODES = "announce_modes"
|
ANNOUNCE_MODES = "announce_modes"
|
||||||
|
|
||||||
|
|
||||||
class InfoSys:
|
class InfoSys:
|
||||||
OFF = "Switch TCS subsystem OFF"
|
OFF = "Switch TCS subsystem OFF"
|
||||||
NML = "Switch TCS subsystem NORMAL (nominal)"
|
NML = "Switch TCS subsystem NORMAL (nominal)"
|
||||||
ENABLE_TEMP_SET = "Enable Primary Temperature Set"
|
|
||||||
REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures"
|
|
||||||
REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures"
|
|
||||||
REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures"
|
|
||||||
REQUEST_HEATER_INFO = "Request heater information"
|
|
||||||
ANNOUNCE_MODES = "Announce Modes recursively"
|
ANNOUNCE_MODES = "Announce Modes recursively"
|
||||||
|
|
||||||
|
|
||||||
def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
|
def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||||
if op_code in OpCodeSys.REQUEST_PRIMARY_TEMP_SET:
|
if op_code == OpCode.OFF:
|
||||||
sensor_set_sid = make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS)
|
|
||||||
q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET)
|
|
||||||
q.add_pus_tc(generate_one_hk_command(sensor_set_sid))
|
|
||||||
if op_code in OpCodeSys.REQUEST_DEVICE_TEMP_SET:
|
|
||||||
q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET)
|
|
||||||
q.add_pus_tc(
|
|
||||||
generate_one_hk_command(make_sid(TCS_CONTROLLER, CtrlSetId.DEVICE_SENSORS))
|
|
||||||
)
|
|
||||||
if op_code in OpCodeSys.REQUEST_DEVICE_SUS_SET:
|
|
||||||
q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET)
|
|
||||||
q.add_pus_tc(
|
|
||||||
generate_one_hk_command(
|
|
||||||
make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if op_code == OpCodeSys.REQUEST_HEATER_INFO:
|
|
||||||
q.add_log_cmd(InfoSys.REQUEST_HEATER_INFO)
|
|
||||||
q.add_pus_tc(
|
|
||||||
create_request_one_diag_command(
|
|
||||||
make_sid(TCS_CONTROLLER, CtrlSetId.HEATER_INFO)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if op_code in OpCodeSys.OFF:
|
|
||||||
q.add_log_cmd(InfoSys.OFF)
|
q.add_log_cmd(InfoSys.OFF)
|
||||||
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF)
|
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF)
|
||||||
if op_code in OpCodeSys.NML:
|
if op_code == OpCode.NML:
|
||||||
q.add_log_cmd(InfoSys.NML)
|
q.add_log_cmd(InfoSys.NML)
|
||||||
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF)
|
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF)
|
||||||
if op_code == OpCodeSys.ENABLE_TEMP_SET:
|
if op_code == OpCode.ANNOUNCE_MODES:
|
||||||
interval_seconds = float(input("Please specify interval in seconds: "))
|
|
||||||
cmds = create_enable_periodic_hk_command_with_interval(
|
|
||||||
False, make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS), interval_seconds
|
|
||||||
)
|
|
||||||
for cmd in cmds:
|
|
||||||
q.add_pus_tc(cmd)
|
|
||||||
if op_code == OpCodeSys.ANNOUNCE_MODES:
|
|
||||||
q.add_log_cmd(InfoSys.ANNOUNCE_MODES)
|
q.add_log_cmd(InfoSys.ANNOUNCE_MODES)
|
||||||
q.add_pus_tc(create_announce_mode_recursive_command(TCS_SUBSYSTEM_ID))
|
q.add_pus_tc(create_announce_mode_recursive_command(TCS_SUBSYSTEM_ID))
|
||||||
pack_tcs_ass_cmds(q, op_code)
|
pack_tcs_ass_cmds(q, op_code)
|
||||||
@ -86,20 +39,11 @@ def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
|
|||||||
@tmtc_definitions_provider
|
@tmtc_definitions_provider
|
||||||
def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
||||||
oce = OpCodeEntry()
|
oce = OpCodeEntry()
|
||||||
oce.add(keys=OpCodeSys.OFF, info=InfoSys.OFF)
|
oce.add(keys=OpCode.OFF, info=InfoSys.OFF)
|
||||||
oce.add(keys=OpCodeSys.NML, info=InfoSys.NML)
|
oce.add(keys=OpCode.NML, info=InfoSys.NML)
|
||||||
oce.add(
|
oce.add(keys=OpCode.ANNOUNCE_MODES, info=InfoSys.ANNOUNCE_MODES)
|
||||||
keys=OpCodeSys.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET
|
|
||||||
)
|
|
||||||
oce.add(
|
|
||||||
keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET
|
|
||||||
)
|
|
||||||
oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET)
|
|
||||||
oce.add(keys=OpCodeSys.REQUEST_HEATER_INFO, info=InfoSys.REQUEST_HEATER_INFO)
|
|
||||||
oce.add(keys=OpCodeSys.ANNOUNCE_MODES, info=InfoSys.ANNOUNCE_MODES)
|
|
||||||
oce.add(keys=OpCodeSys.ENABLE_TEMP_SET, info=InfoSys.ENABLE_TEMP_SET)
|
|
||||||
defs.add_service(
|
defs.add_service(
|
||||||
name=CustomServiceList.TCS,
|
name=CustomServiceList.TCS_SS,
|
||||||
info="TCS",
|
info="TCS subsystem",
|
||||||
op_code_entry=oce,
|
op_code_entry=oce,
|
||||||
)
|
)
|
||||||
|
@ -1,7 +1,11 @@
|
|||||||
|
import dataclasses
|
||||||
|
import datetime
|
||||||
|
import enum
|
||||||
import logging
|
import logging
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||||
|
from eive_tmtc.tmtc.tcs.defs import Heater
|
||||||
from tmtccmd.fsfw import validity_buffer_list
|
from tmtccmd.fsfw import validity_buffer_list
|
||||||
from tmtccmd.util import ObjectIdU32
|
from tmtccmd.util import ObjectIdU32
|
||||||
from .defs import CtrlSetId
|
from .defs import CtrlSetId
|
||||||
@ -11,6 +15,45 @@ from .heater import HEATER_LOCATION
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ThermalComponent(enum.IntEnum):
|
||||||
|
NONE = 0
|
||||||
|
ACS_BOARD = 1
|
||||||
|
MGT = 2
|
||||||
|
RW = 3
|
||||||
|
STR = 4
|
||||||
|
IF_BOARD = 5
|
||||||
|
TCS_BOARD = 6
|
||||||
|
OBC = 7
|
||||||
|
LEGACY_OBCIF_BOARD = 8
|
||||||
|
SBAND_TRANSCEIVER = 9
|
||||||
|
PCDUP60_BOARD = 10
|
||||||
|
PCDUACU = 11
|
||||||
|
PCDUPDU = 12
|
||||||
|
PLPCDU_BOARD = 13
|
||||||
|
PLOCMISSION_BOARD = 14
|
||||||
|
PLOCPROCESSING_BOARD = 15
|
||||||
|
DAC = 16
|
||||||
|
CAMERA = 17
|
||||||
|
DRO = 18
|
||||||
|
X8 = 19
|
||||||
|
HPA = 20
|
||||||
|
TX = 21
|
||||||
|
MPA = 22
|
||||||
|
SCEX_BOARD = 23
|
||||||
|
NUM_ENTRIES = 24
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class TcsCtrlComponentInfo:
|
||||||
|
component: ThermalComponent
|
||||||
|
# Heater on or off?
|
||||||
|
state: bool
|
||||||
|
used_sensor_idx: int
|
||||||
|
used_heater: Heater
|
||||||
|
start_time: datetime.datetime
|
||||||
|
end_time: datetime.datetime
|
||||||
|
|
||||||
|
|
||||||
def handle_thermal_controller_hk_data(
|
def handle_thermal_controller_hk_data(
|
||||||
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
|
object_id: ObjectIdU32, pw: PrintWrapper, set_id: int, hk_data: bytes
|
||||||
):
|
):
|
||||||
@ -119,5 +162,50 @@ def handle_thermal_controller_hk_data(
|
|||||||
)
|
)
|
||||||
current_draw = struct.unpack("!H", hk_data[8:10])[0]
|
current_draw = struct.unpack("!H", hk_data[8:10])[0]
|
||||||
print(f"Heater Power Channel Current Draw: {current_draw} mA")
|
print(f"Heater Power Channel Current Draw: {current_draw} mA")
|
||||||
|
elif set_id == CtrlSetId.TCS_CTRL_INFO:
|
||||||
|
pw.dlog("Received TCS CTRL information set")
|
||||||
|
current_idx = 0
|
||||||
|
heater_states = hk_data[0 : ThermalComponent.NUM_ENTRIES]
|
||||||
|
current_idx += ThermalComponent.NUM_ENTRIES
|
||||||
|
used_sensor_idx = hk_data[
|
||||||
|
current_idx : current_idx + ThermalComponent.NUM_ENTRIES
|
||||||
|
]
|
||||||
|
current_idx += ThermalComponent.NUM_ENTRIES
|
||||||
|
used_heater_idx = hk_data[
|
||||||
|
current_idx : current_idx + ThermalComponent.NUM_ENTRIES
|
||||||
|
]
|
||||||
|
current_idx += ThermalComponent.NUM_ENTRIES
|
||||||
|
start_and_end_time_fmt_str = "!IIIIIIIIIIIIIIIIIIIIIIII"
|
||||||
|
data_len = struct.calcsize(start_and_end_time_fmt_str)
|
||||||
|
start_times = struct.unpack(
|
||||||
|
start_and_end_time_fmt_str, hk_data[current_idx : current_idx + data_len]
|
||||||
|
)
|
||||||
|
current_idx += data_len
|
||||||
|
end_times = struct.unpack(
|
||||||
|
start_and_end_time_fmt_str, hk_data[current_idx : current_idx + data_len]
|
||||||
|
)
|
||||||
|
current_idx += data_len
|
||||||
|
component_list = []
|
||||||
|
for i in range(ThermalComponent.NUM_ENTRIES):
|
||||||
|
info = TcsCtrlComponentInfo(
|
||||||
|
component=ThermalComponent(i),
|
||||||
|
state=bool(heater_states[i]),
|
||||||
|
used_sensor_idx=used_sensor_idx[i],
|
||||||
|
used_heater=Heater(used_heater_idx[i]),
|
||||||
|
start_time=datetime.datetime.fromtimestamp(
|
||||||
|
start_times[i], datetime.timezone.utc
|
||||||
|
),
|
||||||
|
end_time=datetime.datetime.fromtimestamp(
|
||||||
|
end_times[i], datetime.timezone.utc
|
||||||
|
),
|
||||||
|
)
|
||||||
|
component_str = f"{info.component!r}".ljust(46)
|
||||||
|
state_str = "ON" if info.state else "OFF"
|
||||||
|
pw.dlog(
|
||||||
|
f"{component_str}: {state_str.ljust(4)} | Sensor Index "
|
||||||
|
f"{info.used_sensor_idx} | {info.used_heater!r} | Start "
|
||||||
|
f"{info.start_time} | End {info.end_time}"
|
||||||
|
)
|
||||||
|
component_list.append(info)
|
||||||
else:
|
else:
|
||||||
_LOGGER.warning(f"Unimplemented set ID {set_id}")
|
_LOGGER.warning(f"Unimplemented set ID {set_id}")
|
||||||
|
8
tmtcc.py
8
tmtcc.py
@ -166,7 +166,7 @@ class PusHandler(SpecificApidHandlerBase):
|
|||||||
wrapper: VerificationWrapper,
|
wrapper: VerificationWrapper,
|
||||||
printer: FsfwTmTcPrinter,
|
printer: FsfwTmTcPrinter,
|
||||||
raw_logger: RawTmtcTimedLogWrapper,
|
raw_logger: RawTmtcTimedLogWrapper,
|
||||||
hk_level: int
|
hk_level: int,
|
||||||
):
|
):
|
||||||
super().__init__(PUS_APID, None)
|
super().__init__(PUS_APID, None)
|
||||||
self.printer = printer
|
self.printer = printer
|
||||||
@ -177,7 +177,9 @@ class PusHandler(SpecificApidHandlerBase):
|
|||||||
def handle_tm(self, packet: bytes, _user_args: any):
|
def handle_tm(self, packet: bytes, _user_args: any):
|
||||||
# with open("tc.bin", "wb") as of:
|
# with open("tc.bin", "wb") as of:
|
||||||
# of.write(packet)
|
# of.write(packet)
|
||||||
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level)
|
pus_factory_hook(
|
||||||
|
packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class UnknownApidHandler(GenericApidHandlerBase):
|
class UnknownApidHandler(GenericApidHandlerBase):
|
||||||
@ -408,7 +410,7 @@ def setup_tmtc_handlers(
|
|||||||
printer: FsfwTmTcPrinter,
|
printer: FsfwTmTcPrinter,
|
||||||
raw_logger: RawTmtcTimedLogWrapper,
|
raw_logger: RawTmtcTimedLogWrapper,
|
||||||
gui: bool,
|
gui: bool,
|
||||||
hk_level: int
|
hk_level: int,
|
||||||
) -> (CcsdsTmHandler, TcHandler):
|
) -> (CcsdsTmHandler, TcHandler):
|
||||||
cfdp_in_ccsds_wrapper = setup_cfdp_handler()
|
cfdp_in_ccsds_wrapper = setup_cfdp_handler()
|
||||||
verification_wrapper = VerificationWrapper(
|
verification_wrapper = VerificationWrapper(
|
||||||
|
Loading…
Reference in New Issue
Block a user