diff --git a/eive_tmtc/tmtc/tcs/__init__.py b/eive_tmtc/tmtc/tcs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eive_tmtc/tmtc/tcs/brd_assy.py b/eive_tmtc/tmtc/tcs/brd_assy.py new file mode 100644 index 0000000..2aeb01f --- /dev/null +++ b/eive_tmtc/tmtc/tcs/brd_assy.py @@ -0,0 +1,57 @@ +from eive_tmtc.config.definitions import CustomServiceList +from eive_tmtc.config.object_ids import TCS_BOARD_ASS_ID +from eive_tmtc.tmtc.common import pack_mode_cmd_with_info +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + TmtcDefinitionWrapper, + OpCodeEntry, +) +from tmtccmd.tc import DefaultPusQueueHelper +from tmtccmd.tc.pus_200_fsfw_mode import Mode + + +class InfoAssy: + TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on" + TCS_BOARD_ASS_OFF = "Switching TCS board assembly off" + + +class OpCodeAssy: + TCS_BOARD_ASS_NORMAL = ["nml"] + TCS_BOARD_ASS_OFF = ["off"] + + +@tmtc_definitions_provider +def add_tcs_assy_cmds(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + oce.add( + keys=OpCodeAssy.TCS_BOARD_ASS_NORMAL, + info=InfoAssy.TCS_BOARD_ASS_NORMAL, + ) + oce.add( + keys=OpCodeAssy.TCS_BOARD_ASS_OFF, + info=InfoAssy.TCS_BOARD_ASS_OFF, + ) + defs.add_service( + name=CustomServiceList.TCS_ASS.value, + info="TCS Board Assembly", + op_code_entry=oce, + ) + + +def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str): + if op_code in OpCodeAssy.TCS_BOARD_ASS_NORMAL: + pack_mode_cmd_with_info( + object_id=TCS_BOARD_ASS_ID, + mode=Mode.NORMAL, + submode=0, + q=q, + info=InfoAssy.TCS_BOARD_ASS_NORMAL, + ) + if op_code in OpCodeAssy.TCS_BOARD_ASS_OFF: + pack_mode_cmd_with_info( + object_id=TCS_BOARD_ASS_ID, + mode=Mode.OFF, + submode=0, + q=q, + info=InfoAssy.TCS_BOARD_ASS_OFF, + ) diff --git a/eive_tmtc/tmtc/tcs/defs.py b/eive_tmtc/tmtc/tcs/defs.py new file mode 100644 index 0000000..eb0f2d7 --- /dev/null +++ b/eive_tmtc/tmtc/tcs/defs.py @@ -0,0 +1,7 @@ +import enum + + +class CtrlSetId(enum.IntEnum): + PRIMARY_SENSORS = 0 + DEVICE_SENSORS = 1 + SUS_TEMP_SENSORS = 2 diff --git a/eive_tmtc/pus_tc/devs/heater.py b/eive_tmtc/tmtc/tcs/heater.py similarity index 100% rename from eive_tmtc/pus_tc/devs/heater.py rename to eive_tmtc/tmtc/tcs/heater.py diff --git a/eive_tmtc/tmtc/tcs/subsystem.py b/eive_tmtc/tmtc/tcs/subsystem.py new file mode 100644 index 0000000..ebaad19 --- /dev/null +++ b/eive_tmtc/tmtc/tcs/subsystem.py @@ -0,0 +1,74 @@ +from .defs import CtrlSetId +from eive_tmtc.config.definitions import CustomServiceList +from eive_tmtc.config.object_ids import TCS_CONTROLLER, TCS_SUBSYSTEM_ID +from eive_tmtc.tmtc.common import pack_mode_cmd_with_info +from eive_tmtc.tmtc.tcs import pack_tcs_ass_cmds +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + TmtcDefinitionWrapper, + OpCodeEntry, +) +from tmtccmd.tc import DefaultPusQueueHelper +from tmtccmd.tc.pus_200_fsfw_mode import Mode +from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command + + +class OpCodeSys: + OFF = ["off"] + NML = ["nml"] + REQUEST_PRIMARY_TEMP_SET = ["temp"] + REQUEST_DEVICE_TEMP_SET = ["temp_devs"] + REQUEST_DEVICE_SUS_SET = ["temp_sus"] + + +class InfoSys: + OFF = "Switch TCS subsystem OFF" + NML = "Switch TCS subsystem NORMAL (nominal)" + REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures" + REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures" + REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures" + + +def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str): + if op_code in OpCodeSys.REQUEST_PRIMARY_TEMP_SET: + sensor_set_sid = make_sid(TCS_CONTROLLER, CtrlSetId.PRIMARY_SENSORS) + q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET) + q.add_pus_tc(generate_one_hk_command(sensor_set_sid)) + if op_code in OpCodeSys.REQUEST_DEVICE_TEMP_SET: + q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET) + q.add_pus_tc( + generate_one_hk_command(make_sid(TCS_CONTROLLER, CtrlSetId.DEVICE_SENSORS)) + ) + if op_code in OpCodeSys.REQUEST_DEVICE_SUS_SET: + q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET) + q.add_pus_tc( + generate_one_hk_command( + make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS) + ) + ) + if op_code in OpCodeSys.OFF: + q.add_log_cmd(InfoSys.OFF) + pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF) + if op_code in OpCodeSys.NML: + q.add_log_cmd(InfoSys.NML) + pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF) + pack_tcs_ass_cmds(q, op_code) + + +@tmtc_definitions_provider +def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + oce.add(keys=OpCodeSys.OFF, info=InfoSys.OFF) + oce.add(keys=OpCodeSys.NML, info=InfoSys.NML) + oce.add( + keys=OpCodeSys.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET + ) + oce.add( + keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET + ) + oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET) + defs.add_service( + name=CustomServiceList.TCS, + info="TCS Board", + op_code_entry=oce, + ) diff --git a/eive_tmtc/tmtc/tcs.py b/eive_tmtc/tmtc/tcs/tm.py similarity index 50% rename from eive_tmtc/tmtc/tcs.py rename to eive_tmtc/tmtc/tcs/tm.py index f5c589a..4523423 100644 --- a/eive_tmtc/tmtc/tcs.py +++ b/eive_tmtc/tmtc/tcs/tm.py @@ -1,135 +1,10 @@ -import enum import pprint import struct from eive_tmtc.pus_tm.defs import PrintWrapper - -from eive_tmtc.config.definitions import CustomServiceList -from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry -from tmtccmd.config.tmtc import tmtc_definitions_provider -from tmtccmd.tc import DefaultPusQueueHelper -from tmtccmd.tc.pus_200_fsfw_mode import Mode -from tmtccmd.tc.pus_3_fsfw_hk import ( - make_sid, - generate_one_hk_command, -) - -from eive_tmtc.tmtc.common import pack_mode_cmd_with_info -from eive_tmtc.config.object_ids import ( - TCS_BOARD_ASS_ID, - TCS_CONTROLLER, - TCS_SUBSYSTEM_ID, -) from tmtccmd.util import ObjectIdU32 from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter - - -class OpCodeAssy: - TCS_BOARD_ASS_NORMAL = ["nml"] - TCS_BOARD_ASS_OFF = ["off"] - - -class OpCodeSys: - OFF = ["off"] - NML = ["nml"] - REQUEST_PRIMARY_TEMP_SET = ["temp"] - REQUEST_DEVICE_TEMP_SET = ["temp_devs"] - REQUEST_DEVICE_SUS_SET = ["temp_sus"] - - -class InfoSys: - OFF = "Switch TCS subsystem OFF" - NML = "Switch TCS subsystem NORMAL (nominal)" - REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures" - REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures" - REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures" - - -class InfoAssy: - TCS_BOARD_ASS_NORMAL = "Switching TCS board assembly on" - TCS_BOARD_ASS_OFF = "Switching TCS board assembly off" - - -class SetId(enum.IntEnum): - PRIMARY_SENSORS = 0 - DEVICE_SENSORS = 1 - SUS_TEMP_SENSORS = 2 - - -@tmtc_definitions_provider -def add_tcs_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add( - keys=OpCodeAssy.TCS_BOARD_ASS_NORMAL, - info=InfoAssy.TCS_BOARD_ASS_NORMAL, - ) - oce.add( - keys=OpCodeAssy.TCS_BOARD_ASS_OFF, - info=InfoAssy.TCS_BOARD_ASS_OFF, - ) - defs.add_service( - name=CustomServiceList.TCS_ASS.value, - info="TCS Board Assembly", - op_code_entry=oce, - ) - oce = OpCodeEntry() - oce.add(keys=OpCodeSys.OFF, info=InfoSys.OFF) - oce.add(keys=OpCodeSys.NML, info=InfoSys.NML) - oce.add( - keys=OpCodeSys.REQUEST_PRIMARY_TEMP_SET, info=InfoSys.REQUEST_PRIMARY_TEMP_SET - ) - oce.add( - keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET - ) - oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET) - defs.add_service( - name=CustomServiceList.TCS, - info="TCS Board", - op_code_entry=oce, - ) - - -def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str): - if op_code in OpCodeSys.REQUEST_PRIMARY_TEMP_SET: - sensor_set_sid = make_sid(TCS_CONTROLLER, SetId.PRIMARY_SENSORS) - q.add_log_cmd(InfoSys.REQUEST_PRIMARY_TEMP_SET) - q.add_pus_tc(generate_one_hk_command(sensor_set_sid)) - if op_code in OpCodeSys.REQUEST_DEVICE_TEMP_SET: - q.add_log_cmd(InfoSys.REQUEST_DEVICE_TEMP_SET) - q.add_pus_tc( - generate_one_hk_command(make_sid(TCS_CONTROLLER, SetId.DEVICE_SENSORS)) - ) - if op_code in OpCodeSys.REQUEST_DEVICE_SUS_SET: - q.add_log_cmd(InfoSys.REQUEST_DEVICE_SUS_SET) - q.add_pus_tc( - generate_one_hk_command(make_sid(TCS_CONTROLLER, SetId.SUS_TEMP_SENSORS)) - ) - if op_code in OpCodeSys.OFF: - q.add_log_cmd(InfoSys.OFF) - pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF) - if op_code in OpCodeSys.NML: - q.add_log_cmd(InfoSys.NML) - pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF) - pack_tcs_ass_cmds(q, op_code) - - -def pack_tcs_ass_cmds(q: DefaultPusQueueHelper, op_code: str): - if op_code in OpCodeAssy.TCS_BOARD_ASS_NORMAL: - pack_mode_cmd_with_info( - object_id=TCS_BOARD_ASS_ID, - mode=Mode.NORMAL, - submode=0, - q=q, - info=InfoAssy.TCS_BOARD_ASS_NORMAL, - ) - if op_code in OpCodeAssy.TCS_BOARD_ASS_OFF: - pack_mode_cmd_with_info( - object_id=TCS_BOARD_ASS_ID, - mode=Mode.OFF, - submode=0, - q=q, - info=InfoAssy.TCS_BOARD_ASS_OFF, - ) +from .defs import CtrlSetId def handle_thermal_controller_hk_data( @@ -142,7 +17,7 @@ def handle_thermal_controller_hk_data( if TCP_TEMP_DEV_SERVER: TCP_TEMP_DEV_SERVER = TmTcpServer("localhost", 7306) """ - if set_id == SetId.PRIMARY_SENSORS: + if set_id == CtrlSetId.PRIMARY_SENSORS: pw = PrintWrapper(printer) pw.dlog("Received sensor temperature data") @@ -174,7 +49,7 @@ def handle_thermal_controller_hk_data( printer.file_logger.info(str(parsed_data)) pp = pprint.PrettyPrinter(depth=4) pp.pprint(parsed_data) - elif set_id == SetId.DEVICE_SENSORS: + elif set_id == CtrlSetId.DEVICE_SENSORS: pw = PrintWrapper(printer) pw.dlog("Received device temperature data") fmt_str = "!fhhhhiiiifffhffffffffffffff" @@ -211,7 +86,7 @@ def handle_thermal_controller_hk_data( printer.file_logger.info(str(parsed_data)) pp = pprint.PrettyPrinter(depth=4) pp.pprint(parsed_data) - elif set_id == SetId.SUS_TEMP_SENSORS: + elif set_id == CtrlSetId.SUS_TEMP_SENSORS: pw = PrintWrapper(printer) pw.dlog("Received SUS temperature data") fmt_str = "!ffffffffffffffffff" diff --git a/tmtcc.py b/tmtcc.py index 62c2d59..b36dd21 100755 --- a/tmtcc.py +++ b/tmtcc.py @@ -226,7 +226,7 @@ class TcHandler(TcHandlerBase): default_pus_apid=PUS_APID, seq_cnt_provider=seq_count_provider, pus_verificator=pus_verificator, - tc_sched_timestamp_len=4 + tc_sched_timestamp_len=4, ) self.cfdp_in_ccsds_wrapper = cfdp_in_ccsds_wrapper @@ -335,7 +335,9 @@ def setup_params() -> SetupWrapper: parser_wrapper.create_default_parent_parser() parser_wrapper.create_default_parser() parser_wrapper.add_def_proc_and_cfdp_as_subparsers() - post_arg_parsing_wrapper = parser_wrapper.parse(setup_params=params, hook_obj=hook_obj) + post_arg_parsing_wrapper = parser_wrapper.parse( + setup_params=params, hook_obj=hook_obj + ) tmtccmd.init_printout(post_arg_parsing_wrapper.use_gui) use_prompts = not post_arg_parsing_wrapper.use_gui proc_param_wrapper = ProcedureParamsWrapper()