restructure TCS TMTC module

This commit is contained in:
Robin Müller 2023-01-31 15:41:51 +01:00
parent 2793b354a6
commit 3c20d2ea24
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
7 changed files with 146 additions and 131 deletions

View File

View File

@ -0,0 +1,57 @@
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import TCS_BOARD_ASS_ID
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode
class InfoAssy:
TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on"
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
class OpCodeAssy:
TCS_BOARD_ASS_NORMAL = ["nml"]
TCS_BOARD_ASS_OFF = ["off"]
@tmtc_definitions_provider
def add_tcs_assy_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(
keys=OpCodeAssy.TCS_BOARD_ASS_NORMAL,
info=InfoAssy.TCS_BOARD_ASS_NORMAL,
)
oce.add(
keys=OpCodeAssy.TCS_BOARD_ASS_OFF,
info=InfoAssy.TCS_BOARD_ASS_OFF,
)
defs.add_service(
name=CustomServiceList.TCS_ASS.value,
info="TCS Board Assembly",
op_code_entry=oce,
)
def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodeAssy.TCS_BOARD_ASS_NORMAL:
pack_mode_cmd_with_info(
object_id=TCS_BOARD_ASS_ID,
mode=Mode.NORMAL,
submode=0,
q=q,
info=InfoAssy.TCS_BOARD_ASS_NORMAL,
)
if op_code in OpCodeAssy.TCS_BOARD_ASS_OFF:
pack_mode_cmd_with_info(
object_id=TCS_BOARD_ASS_ID,
mode=Mode.OFF,
submode=0,
q=q,
info=InfoAssy.TCS_BOARD_ASS_OFF,
)

View File

@ -0,0 +1,7 @@
import enum
class CtrlSetId(enum.IntEnum):
PRIMARY_SENSORS = 0
DEVICE_SENSORS = 1
SUS_TEMP_SENSORS = 2

View File

@ -0,0 +1,74 @@
from .defs import CtrlSetId
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import TCS_CONTROLLER, TCS_SUBSYSTEM_ID
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from eive_tmtc.tmtc.tcs import pack_tcs_ass_cmds
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
class OpCodeSys:
OFF = ["off"]
NML = ["nml"]
REQUEST_PRIMARY_TEMP_SET = ["temp"]
REQUEST_DEVICE_TEMP_SET = ["temp_devs"]
REQUEST_DEVICE_SUS_SET = ["temp_sus"]
class InfoSys:
OFF = "Switch TCS subsystem OFF"
NML = "Switch TCS subsystem NORMAL (nominal)"
REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures"
REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures"
REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures"
def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodeSys.REQUEST_PRIMARY_TEMP_SET:
sensor_set_sid = make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS)
q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET)
q.add_pus_tc(generate_one_hk_command(sensor_set_sid))
if op_code in OpCodeSys.REQUEST_DEVICE_TEMP_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET)
q.add_pus_tc(
generate_one_hk_command(make_sid(TCS_CONTROLLER, CtrlSetId.DEVICE_SENSORS))
)
if op_code in OpCodeSys.REQUEST_DEVICE_SUS_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET)
q.add_pus_tc(
generate_one_hk_command(
make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS)
)
)
if op_code in OpCodeSys.OFF:
q.add_log_cmd(InfoSys.OFF)
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF)
if op_code in OpCodeSys.NML:
q.add_log_cmd(InfoSys.NML)
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF)
pack_tcs_ass_cmds(q, op_code)
@tmtc_definitions_provider
def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodeSys.OFF, info=InfoSys.OFF)
oce.add(keys=OpCodeSys.NML, info=InfoSys.NML)
oce.add(
keys=OpCodeSys.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET
)
oce.add(
keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET
)
oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET)
defs.add_service(
name=CustomServiceList.TCS,
info="TCS Board",
op_code_entry=oce,
)

View File

@ -1,135 +1,10 @@
import enum
import pprint import pprint
import struct import struct
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode
from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid,
generate_one_hk_command,
)
from eive_tmtc.tmtc.common import pack_mode_cmd_with_info
from eive_tmtc.config.object_ids import (
TCS_BOARD_ASS_ID,
TCS_CONTROLLER,
TCS_SUBSYSTEM_ID,
)
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from .defs import CtrlSetId
class OpCodeAssy:
TCS_BOARD_ASS_NORMAL = ["nml"]
TCS_BOARD_ASS_OFF = ["off"]
class OpCodeSys:
OFF = ["off"]
NML = ["nml"]
REQUEST_PRIMARY_TEMP_SET = ["temp"]
REQUEST_DEVICE_TEMP_SET = ["temp_devs"]
REQUEST_DEVICE_SUS_SET = ["temp_sus"]
class InfoSys:
OFF = "Switch TCS subsystem OFF"
NML = "Switch TCS subsystem NORMAL (nominal)"
REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures"
REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures"
REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures"
class InfoAssy:
TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on"
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
class SetId(enum.IntEnum):
PRIMARY_SENSORS = 0
DEVICE_SENSORS = 1
SUS_TEMP_SENSORS = 2
@tmtc_definitions_provider
def add_tcs_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(
keys=OpCodeAssy.TCS_BOARD_ASS_NORMAL,
info=InfoAssy.TCS_BOARD_ASS_NORMAL,
)
oce.add(
keys=OpCodeAssy.TCS_BOARD_ASS_OFF,
info=InfoAssy.TCS_BOARD_ASS_OFF,
)
defs.add_service(
name=CustomServiceList.TCS_ASS.value,
info="TCS Board Assembly",
op_code_entry=oce,
)
oce = OpCodeEntry()
oce.add(keys=OpCodeSys.OFF, info=InfoSys.OFF)
oce.add(keys=OpCodeSys.NML, info=InfoSys.NML)
oce.add(
keys=OpCodeSys.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET
)
oce.add(
keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET
)
oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET)
defs.add_service(
name=CustomServiceList.TCS,
info="TCS Board",
op_code_entry=oce,
)
def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodeSys.REQUEST_PRIMARY_TEMP_SET:
sensor_set_sid = make_sid(TCS_CONTROLLER, SetId.PRIMARY_SENSORS)
q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET)
q.add_pus_tc(generate_one_hk_command(sensor_set_sid))
if op_code in OpCodeSys.REQUEST_DEVICE_TEMP_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET)
q.add_pus_tc(
generate_one_hk_command(make_sid(TCS_CONTROLLER, SetId.DEVICE_SENSORS))
)
if op_code in OpCodeSys.REQUEST_DEVICE_SUS_SET:
q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET)
q.add_pus_tc(
generate_one_hk_command(make_sid(TCS_CONTROLLER, SetId.SUS_TEMP_SENSORS))
)
if op_code in OpCodeSys.OFF:
q.add_log_cmd(InfoSys.OFF)
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF)
if op_code in OpCodeSys.NML:
q.add_log_cmd(InfoSys.NML)
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF)
pack_tcs_ass_cmds(q, op_code)
def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodeAssy.TCS_BOARD_ASS_NORMAL:
pack_mode_cmd_with_info(
object_id=TCS_BOARD_ASS_ID,
mode=Mode.NORMAL,
submode=0,
q=q,
info=InfoAssy.TCS_BOARD_ASS_NORMAL,
)
if op_code in OpCodeAssy.TCS_BOARD_ASS_OFF:
pack_mode_cmd_with_info(
object_id=TCS_BOARD_ASS_ID,
mode=Mode.OFF,
submode=0,
q=q,
info=InfoAssy.TCS_BOARD_ASS_OFF,
)
def handle_thermal_controller_hk_data( def handle_thermal_controller_hk_data(
@ -142,7 +17,7 @@ def handle_thermal_controller_hk_data(
if TCP_TEMP_DEV_SERVER: if TCP_TEMP_DEV_SERVER:
TCP_TEMP_DEV_SERVER = TmTcpServer("localhost", 7306) TCP_TEMP_DEV_SERVER = TmTcpServer("localhost", 7306)
""" """
if set_id == SetId.PRIMARY_SENSORS: if set_id == CtrlSetId.PRIMARY_SENSORS:
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
pw.dlog("Received sensor temperature data") pw.dlog("Received sensor temperature data")
@ -174,7 +49,7 @@ def handle_thermal_controller_hk_data(
printer.file_logger.info(str(parsed_data)) printer.file_logger.info(str(parsed_data))
pp = pprint.PrettyPrinter(depth=4) pp = pprint.PrettyPrinter(depth=4)
pp.pprint(parsed_data) pp.pprint(parsed_data)
elif set_id == SetId.DEVICE_SENSORS: elif set_id == CtrlSetId.DEVICE_SENSORS:
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
pw.dlog("Received device temperature data") pw.dlog("Received device temperature data")
fmt_str = "!fhhhhiiiifffhffffffffffffff" fmt_str = "!fhhhhiiiifffhffffffffffffff"
@ -211,7 +86,7 @@ def handle_thermal_controller_hk_data(
printer.file_logger.info(str(parsed_data)) printer.file_logger.info(str(parsed_data))
pp = pprint.PrettyPrinter(depth=4) pp = pprint.PrettyPrinter(depth=4)
pp.pprint(parsed_data) pp.pprint(parsed_data)
elif set_id == SetId.SUS_TEMP_SENSORS: elif set_id == CtrlSetId.SUS_TEMP_SENSORS:
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
pw.dlog("Received SUS temperature data") pw.dlog("Received SUS temperature data")
fmt_str = "!ffffffffffffffffff" fmt_str = "!ffffffffffffffffff"

View File

@ -226,7 +226,7 @@ class TcHandler(TcHandlerBase):
default_pus_apid=PUS_APID, default_pus_apid=PUS_APID,
seq_cnt_provider=seq_count_provider, seq_cnt_provider=seq_count_provider,
pus_verificator=pus_verificator, pus_verificator=pus_verificator,
tc_sched_timestamp_len=4 tc_sched_timestamp_len=4,
) )
self.cfdp_in_ccsds_wrapper = cfdp_in_ccsds_wrapper self.cfdp_in_ccsds_wrapper = cfdp_in_ccsds_wrapper
@ -335,7 +335,9 @@ def setup_params() -> SetupWrapper:
parser_wrapper.create_default_parent_parser() parser_wrapper.create_default_parent_parser()
parser_wrapper.create_default_parser() parser_wrapper.create_default_parser()
parser_wrapper.add_def_proc_and_cfdp_as_subparsers() parser_wrapper.add_def_proc_and_cfdp_as_subparsers()
post_arg_parsing_wrapper = parser_wrapper.parse(setup_params=params, hook_obj=hook_obj) post_arg_parsing_wrapper = parser_wrapper.parse(
setup_params=params, hook_obj=hook_obj
)
tmtccmd.init_printout(post_arg_parsing_wrapper.use_gui) tmtccmd.init_printout(post_arg_parsing_wrapper.use_gui)
use_prompts = not post_arg_parsing_wrapper.use_gui use_prompts = not post_arg_parsing_wrapper.use_gui
proc_param_wrapper = ProcedureParamsWrapper() proc_param_wrapper = ProcedureParamsWrapper()