diff --git a/pus_tc/system/proc.py b/pus_tc/system/proc.py index 79e1e24..5ffcba9 100644 --- a/pus_tc/system/proc.py +++ b/pus_tc/system/proc.py @@ -1,3 +1,4 @@ +from __future__ import annotations from typing import List from config.definitions import CustomServiceList @@ -78,17 +79,31 @@ PROC_INFO_DICT = { KAI.TCS_FT_OFF[0]: [OpCodes.TCS_FT_OFF, KAI.TCS_FT_OFF[1], 120.0, 10.0], KAI.ACS_FT[0]: [OpCodes.ACS_FT, KAI.ACS_FT[1], 120.0, 10.0], KAI.MGT_FT[0]: [OpCodes.MGT_FT, KAI.MGT_FT[1], 120.0, 10.0], - # collection_time for KAI.MGT_FT_DP maybe be reduced as a full 120seconds is not needed after MGTs are tested + # collection_time for KAI.MGT_FT_DP maybe be reduced as a full 120 + # seconds is not needed after MGTs are tested KAI.MGT_FT_DP[0]: [OpCodes.MGT_FT_DP, KAI.MGT_FT_DP[1], 10.0, 10.0], KAI.SUS_FT[0]: [OpCodes.SUS_FT, KAI.SUS_FT[1], 120.0, 10.0], KAI.STR_FT[0]: [OpCodes.STR_FT, KAI.STR_FT[1], 120.0, 10.0], - # collection_time for KAI.RW_FT_ONE_RW maybe be reduced as a full 120seconds is not needed after RWs are tested + # collection_time for KAI.RW_FT_ONE_RW maybe be reduced as a full 120 + # seconds is not needed after RWs are tested KAI.RW_FT_ONE_RW[0]: [OpCodes.RW_FT_ONE_RW, KAI.RW_FT_ONE_RW[1], 10.0, 1.0], - # collection_time for KAI.RW_FT_ONE_RW maybe be reduced as a full 120seconds is not needed after RWs are tested + # collection_time for KAI.RW_FT_ONE_RW maybe be reduced as a full 120 + # seconds is not needed after RWs are tested KAI.RW_FT_TWO_RWS[0]: [OpCodes.RW_FT_TWO_RWS, KAI.RW_FT_TWO_RWS[1], 10.0, 1.0], } +class GenericHkListeningCfg: + def __init__(self, mgt: bool, one_rw: bool, two_rws: bool): + self.mgt = mgt + self.one_rw = one_rw + self.two_rws = two_rws + + @classmethod + def default(cls) -> GenericHkListeningCfg: + return GenericHkListeningCfg(False, False, False) + + def generic_print(tc_queue: TcQueueT, info: dict): tc_queue.appendleft( (QueueCommands.PRINT, f"Executing {info[1]} Procedure (OpCodes: {info[0]})") @@ -115,9 +130,7 @@ def pack_generic_hk_listening_cmds( proc_key: str, sid_list: list[bytearray], diag: bool, - mgt: bool, - one_rw: bool, - two_rws: bool, + cfg: GenericHkListeningCfg, ): info = PROC_INFO_DICT[proc_key] collection_time = info[2] @@ -132,18 +145,17 @@ def pack_generic_hk_listening_cmds( interval_seconds=info[3], ) - if mgt is True: + if cfg.mgt: activate_mgts_alternately( tc_queue=tc_queue, ) - elif one_rw is True: + elif cfg.one_rw: activate_all_rws_in_sequence( tc_queue=tc_queue, test_speed=20000, test_ramp_time=10000, init_ssc=0 ) - elif two_rws is True: + elif cfg.two_rws: activate_all_rws_two_consecutively(tc_queue=tc_queue, init_ssc=0) - else: pass @@ -170,9 +182,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) if op_code in OpCodes.CORE_FT: @@ -183,9 +193,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) if op_code in OpCodes.PCDU_FT: @@ -203,9 +211,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) if op_code in OpCodes.RAD_SEN_FT: @@ -216,9 +222,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) if op_code in OpCodes.TCS_FT_ON: @@ -250,9 +254,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_acs_command(tc_queue=tc_queue, op_code="acs-off") @@ -273,9 +275,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_acs_command(tc_queue=tc_queue, op_code="acs-off") @@ -302,9 +302,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_acs_command(tc_queue=tc_queue, op_code="acs-off") @@ -318,14 +316,10 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.ENG_HK_SET)) sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.CAL_MTM_SET)) sid_list.append(make_sid(oids.IMTQ_HANDLER_ID, ImtqSetIds.RAW_MTM_SET)) + cfg = GenericHkListeningCfg.default() + cfg.mgt = True pack_generic_hk_listening_cmds( - tc_queue=tc_queue, - proc_key=key, - sid_list=sid_list, - diag=False, - mgt=False, - one_rw=False, - two_rws=False, + tc_queue=tc_queue, proc_key=key, sid_list=sid_list, diag=False, cfg=cfg ) pack_imtq_test_into(oids.IMTQ_HANDLER_ID, tc_queue=tc_queue, op_code="0") @@ -347,9 +341,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=True, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_imtq_test_into(oids.IMTQ_HANDLER_ID, tc_queue=tc_queue, op_code="0") @@ -384,9 +376,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_acs_command(tc_queue=tc_queue, op_code="sus-off") @@ -401,9 +391,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_acs_command(tc_queue=tc_queue, op_code="sus-off") @@ -420,9 +408,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_acs_command(tc_queue=tc_queue, op_code="sus-off") @@ -441,9 +427,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=False, + cfg=GenericHkListeningCfg.default(), ) pack_star_tracker_commands( @@ -478,9 +462,7 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=True, - two_rws=False, + cfg=GenericHkListeningCfg(mgt=False, one_rw=True, two_rws=False), ) # RW OFF pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="off") @@ -513,40 +495,12 @@ def pack_proc_commands(tc_queue: TcQueueT, op_code: str): proc_key=key, sid_list=sid_list, diag=False, - mgt=False, - one_rw=False, - two_rws=True, + cfg=GenericHkListeningCfg(mgt=False, one_rw=False, two_rws=True), ) # RW OFF pack_rw_ass_cmds(object_id=oids.RW_ASSEMBLY, tc_queue=tc_queue, op_code="off") -""" -def listen_to_hk_for_x_seconds( - tc_queue: TcQueueT, - diag: bool, - device: str, - sid: bytes, - interval_seconds: float, - collection_time: float, -): - - tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) - cmd_tuple = enable_periodic_hk_command_with_interval( - diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0 - ) - for cmd in cmd_tuple: - tc_queue.appendleft(cmd.pack_command_tuple()) - - tc_queue.appendleft((QueueCommands.WAIT, collection_time)) - - tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}")) - tc_queue.appendleft( - disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() - ) -""" - - def enable_listen_to_hk_for_x_seconds( tc_queue: TcQueueT, diag: bool, @@ -561,8 +515,6 @@ def enable_listen_to_hk_for_x_seconds( for cmd in cmd_tuple: tc_queue.appendleft(cmd.pack_command_tuple()) - tc_queue.appendleft((QueueCommands.WAIT, 2.0)) - def disable_listen_to_hk_for_x_seconds( tc_queue: TcQueueT,