add command demultiplexing for tcs ctrl

This commit is contained in:
Robin Müller 2023-07-10 16:25:44 +02:00
parent 8d6dd97d85
commit 7e9c626ec8
Signed by: muellerr
GPG Key ID: 407F9B00F858F270
8 changed files with 103 additions and 74 deletions

View File

@ -65,7 +65,8 @@ class CustomServiceList(str, enum.Enum):
PL_SS = "pl_subsystem" PL_SS = "pl_subsystem"
ACS_BRD_ASS = "acs_brd_ass" ACS_BRD_ASS = "acs_brd_ass"
SUS_BRD_ASS = "sus_brd_ass" SUS_BRD_ASS = "sus_brd_ass"
TCS = "tcs" TCS_SS = "tcs_subsystem"
TCS_CTRL = "tcs_ctrl"
TCS_ASS = "tcs_ass" TCS_ASS = "tcs_ass"
TIME = "time" TIME = "time"
PROCEDURE = "proc" PROCEDURE = "proc"

View File

@ -6,6 +6,7 @@ from typing import cast
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
from eive_tmtc.tmtc.power.power import pack_power_commands from eive_tmtc.tmtc.power.power import pack_power_commands
from eive_tmtc.tmtc.tcs.ctrl import pack_tcs_ctrl_commands
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands from eive_tmtc.tmtc.tcs.subsystem import pack_tcs_sys_commands
@ -99,8 +100,10 @@ def handle_default_procedure( # noqa C901: Complexity okay here.
if service == CustomServiceList.ACU.value: if service == CustomServiceList.ACU.value:
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID)) object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code) return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS.value: if service == CustomServiceList.TCS_SS.value:
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code) return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS_CTRL.value:
return pack_tcs_ctrl_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TMP1075.value: if service == CustomServiceList.TMP1075.value:
menu_dict = { menu_dict = {
"0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID), "0": ("TMP1075 TCS Board 0", TMP1075_HANDLER_TCS_BRD_0_ID),

View File

@ -48,10 +48,7 @@ FORWARD_SENSOR_TEMPS = False
def handle_hk_packet( def handle_hk_packet(
raw_tm: bytes, raw_tm: bytes, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, hk_level: int
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
hk_level: int
): ):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)

View File

@ -29,7 +29,7 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
verif_wrapper: VerificationWrapper, verif_wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
hk_level: int hk_level: int,
): ):
if len(packet) < 8: if len(packet) < 8:
_LOGGER.warning("Detected packet shorter than 8 bytes!") _LOGGER.warning("Detected packet shorter than 8 bytes!")
@ -49,7 +49,9 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
if service == 1: if service == 1:
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet) handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
elif service == 3: elif service == 3:
handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level) handle_hk_packet(
printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level
)
elif service == 5: elif service == 5:
handle_event_packet(raw_tm=packet, pw=pw) handle_event_packet(raw_tm=packet, pw=pw)
elif service == 8: elif service == 8:

View File

@ -0,0 +1,74 @@
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import TCS_CONTROLLER
from eive_tmtc.tmtc.tcs import CtrlSetId
from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds
from eive_tmtc.tmtc.tcs.subsystem import InfoSys
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
generate_one_hk_command,
create_request_one_diag_command,
create_enable_periodic_hk_command_with_interval,
)
class OpCode:
REQUEST_PRIMARY_TEMP_SET = "temp"
ENABLE_TEMP_SET = "enable_temp_set"
REQUEST_DEVICE_TEMP_SET = "temp_devs"
REQUEST_DEVICE_SUS_SET = "temp_sus"
REQUEST_HEATER_INFO = "heater_info"
def pack_tcs_ctrl_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code == OpCode.REQUEST_PRIMARY_TEMP_SET:
sensor_set_sid = make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS)
q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET)
q.add_pus_tc(generate_one_hk_command(sensor_set_sid))
if op_code == OpCode.REQUEST_DEVICE_TEMP_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET)
q.add_pus_tc(
generate_one_hk_command(make_sid(TCS_CONTROLLER, CtrlSetId.DEVICE_SENSORS))
)
if op_code == OpCode.REQUEST_DEVICE_SUS_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET)
q.add_pus_tc(
generate_one_hk_command(
make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS)
)
)
if op_code == OpCode.REQUEST_HEATER_INFO:
q.add_log_cmd(InfoSys.REQUEST_HEATER_INFO)
q.add_pus_tc(
create_request_one_diag_command(
make_sid(TCS_CONTROLLER, CtrlSetId.HEATER_INFO)
)
)
if op_code == OpCode.ENABLE_TEMP_SET:
interval_seconds = float(input("Please specify interval in seconds: "))
cmds = create_enable_periodic_hk_command_with_interval(
False, make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS), interval_seconds
)
for cmd in cmds:
q.add_pus_tc(cmd)
pack_tcs_ass_cmds(q, op_code)
@tmtc_definitions_provider
def add_tcs_ctrl_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.ENABLE_TEMP_SET, info=InfoSys.ENABLE_TEMP_SET)
oce.add(keys=OpCode.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET)
oce.add(keys=OpCode.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET)
oce.add(keys=OpCode.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET)
oce.add(keys=OpCode.REQUEST_HEATER_INFO, info=InfoSys.REQUEST_HEATER_INFO)
defs.add_service(
name=CustomServiceList.TCS_CTRL,
info="TCS controller",
op_code_entry=oce,
)

View File

@ -1,6 +1,5 @@
from .defs import CtrlSetId
from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import TCS_CONTROLLER, TCS_SUBSYSTEM_ID from eive_tmtc.config.object_ids import TCS_SUBSYSTEM_ID
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds from eive_tmtc.tmtc.tcs.brd_assy import pack_tcs_ass_cmds
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
@ -10,22 +9,11 @@ from tmtccmd.config.tmtc import (
) )
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode, create_announce_mode_recursive_command from tmtccmd.tc.pus_200_fsfw_mode import Mode, create_announce_mode_recursive_command
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
generate_one_hk_command,
create_enable_periodic_hk_command_with_interval,
create_request_one_diag_command,
)
class OpCodeSys: class OpCode:
OFF = ["off"] OFF = "off"
NML = ["nml"] NML = "nml"
REQUEST_PRIMARY_TEMP_SET = ["temp"]
ENABLE_TEMP_SET = "enable_temp_set"
REQUEST_DEVICE_TEMP_SET = ["temp_devs"]
REQUEST_DEVICE_SUS_SET = ["temp_sus"]
REQUEST_HEATER_INFO = "heater_info"
ANNOUNCE_MODES = "announce_modes" ANNOUNCE_MODES = "announce_modes"
@ -41,43 +29,13 @@ class InfoSys:
def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str): def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodeSys.REQUEST_PRIMARY_TEMP_SET: if op_code == OpCode.OFF:
sensor_set_sid = make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS)
q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET)
q.add_pus_tc(generate_one_hk_command(sensor_set_sid))
if op_code in OpCodeSys.REQUEST_DEVICE_TEMP_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET)
q.add_pus_tc(
generate_one_hk_command(make_sid(TCS_CONTROLLER, CtrlSetId.DEVICE_SENSORS))
)
if op_code in OpCodeSys.REQUEST_DEVICE_SUS_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET)
q.add_pus_tc(
generate_one_hk_command(
make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS)
)
)
if op_code == OpCodeSys.REQUEST_HEATER_INFO:
q.add_log_cmd(InfoSys.REQUEST_HEATER_INFO)
q.add_pus_tc(
create_request_one_diag_command(
make_sid(TCS_CONTROLLER, CtrlSetId.HEATER_INFO)
)
)
if op_code in OpCodeSys.OFF:
q.add_log_cmd(InfoSys.OFF) q.add_log_cmd(InfoSys.OFF)
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF) pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF)
if op_code in OpCodeSys.NML: if op_code == OpCode.NML:
q.add_log_cmd(InfoSys.NML) q.add_log_cmd(InfoSys.NML)
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF) pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF)
if op_code == OpCodeSys.ENABLE_TEMP_SET: if op_code == OpCode.ANNOUNCE_MODES:
interval_seconds = float(input("Please specify interval in seconds: "))
cmds = create_enable_periodic_hk_command_with_interval(
False, make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS), interval_seconds
)
for cmd in cmds:
q.add_pus_tc(cmd)
if op_code == OpCodeSys.ANNOUNCE_MODES:
q.add_log_cmd(InfoSys.ANNOUNCE_MODES) q.add_log_cmd(InfoSys.ANNOUNCE_MODES)
q.add_pus_tc(create_announce_mode_recursive_command(TCS_SUBSYSTEM_ID)) q.add_pus_tc(create_announce_mode_recursive_command(TCS_SUBSYSTEM_ID))
pack_tcs_ass_cmds(q, op_code) pack_tcs_ass_cmds(q, op_code)
@ -86,20 +44,11 @@ def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
@tmtc_definitions_provider @tmtc_definitions_provider
def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper): def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
oce.add(keys=OpCodeSys.OFF, info=InfoSys.OFF) oce.add(keys=OpCode.OFF, info=InfoSys.OFF)
oce.add(keys=OpCodeSys.NML, info=InfoSys.NML) oce.add(keys=OpCode.NML, info=InfoSys.NML)
oce.add( oce.add(keys=OpCode.ANNOUNCE_MODES, info=InfoSys.ANNOUNCE_MODES)
keys=OpCodeSys.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET
)
oce.add(
keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET
)
oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET)
oce.add(keys=OpCodeSys.REQUEST_HEATER_INFO, info=InfoSys.REQUEST_HEATER_INFO)
oce.add(keys=OpCodeSys.ANNOUNCE_MODES, info=InfoSys.ANNOUNCE_MODES)
oce.add(keys=OpCodeSys.ENABLE_TEMP_SET, info=InfoSys.ENABLE_TEMP_SET)
defs.add_service( defs.add_service(
name=CustomServiceList.TCS, name=CustomServiceList.TCS_SS,
info="TCS", info="TCS subsystem",
op_code_entry=oce, op_code_entry=oce,
) )

View File

@ -149,6 +149,7 @@ def handle_thermal_controller_hk_data(
current_draw = struct.unpack("!H", hk_data[8:10])[0] current_draw = struct.unpack("!H", hk_data[8:10])[0]
print(f"Heater Power Channel Current Draw: {current_draw} mA") print(f"Heater Power Channel Current Draw: {current_draw} mA")
elif set_id == CtrlSetId.TCS_CTRL_INFO: elif set_id == CtrlSetId.TCS_CTRL_INFO:
pw.dlog("Received TCS CTRL set")
pass pass
else: else:
_LOGGER.warning(f"Unimplemented set ID {set_id}") _LOGGER.warning(f"Unimplemented set ID {set_id}")

View File

@ -166,7 +166,7 @@ class PusHandler(SpecificApidHandlerBase):
wrapper: VerificationWrapper, wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
hk_level: int hk_level: int,
): ):
super().__init__(PUS_APID, None) super().__init__(PUS_APID, None)
self.printer = printer self.printer = printer
@ -177,7 +177,9 @@ class PusHandler(SpecificApidHandlerBase):
def handle_tm(self, packet: bytes, _user_args: any): def handle_tm(self, packet: bytes, _user_args: any):
# with open("tc.bin", "wb") as of: # with open("tc.bin", "wb") as of:
# of.write(packet) # of.write(packet)
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level) pus_factory_hook(
packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level
)
class UnknownApidHandler(GenericApidHandlerBase): class UnknownApidHandler(GenericApidHandlerBase):
@ -408,7 +410,7 @@ def setup_tmtc_handlers(
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
gui: bool, gui: bool,
hk_level: int hk_level: int,
) -> (CcsdsTmHandler, TcHandler): ) -> (CcsdsTmHandler, TcHandler):
cfdp_in_ccsds_wrapper = setup_cfdp_handler() cfdp_in_ccsds_wrapper = setup_cfdp_handler()
verification_wrapper = VerificationWrapper( verification_wrapper = VerificationWrapper(