from tmtccmd.config import QueueCommands from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.pus_3_fsfw_hk import * from config.object_ids import ( BPX_HANDLER_ID, P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, CORE_CONTROLLER_ID, ) import config.object_ids as oids from pus_tc.devs.bpx_batt import BpxSetIds from pus_tc.system.core import SetIds as CoreSetIds from gomspace.gomspace_common import SetIds as GsSetIds class OpCodes: HEATER = ["0", "heater"] BAT_FT = ["bat-ft"] CORE_FT = ["core-ft"] PCDU_FT = ["pcdu-ft"] class KeyAndInfo: HEATER = ["Heater", "heater procedure"] BAT_FT = ["BPX Battery", "battery functional test"] PCDU_FT = ["PCDU", "PCDU functional test"] CORE_FT = ["OBC", "OBC functional test"] KAI = KeyAndInfo PROC_INFO_DICT = { KAI.BAT_FT[0]: [OpCodes.BAT_FT, KAI.BAT_FT[1], 120.0, 10.0], KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KeyAndInfo.PCDU_FT[1], 120.0, 10.0], KAI.CORE_FT[0]: [OpCodes.CORE_FT, KeyAndInfo.CORE_FT[1], 120.0, 10.0], } def generic_print(tc_queue: TcQueueT, key: str): info = PROC_INFO_DICT[key] tc_queue.appendleft( (QueueCommands.PRINT, f"Executing {info[0]} Procedure (OpCodes: {info[1]})") ) def pack_generic_hk_listening_cmds( tc_queue: TcQueueT, proc_key: str, sid: bytes, diag: bool ): generic_print(tc_queue=tc_queue, key=proc_key) listen_to_hk_for_x_seconds( diag=diag, tc_queue=tc_queue, device=proc_key, sid=sid, interval_seconds=10.0, collection_time=120.0, ) def pack_proc_commands(tc_queue: TcQueueT, op_code: str): if op_code in OpCodes.BAT_FT: key = KAI.BAT_FT[0] sid = make_sid(BPX_HANDLER_ID, BpxSetIds.GET_HK_SET) pack_generic_hk_listening_cmds( tc_queue=tc_queue, proc_key=key, sid=sid, diag=False ) if op_code in OpCodes.PCDU_FT: key = KAI.PCDU_FT[0] sid = make_sid(P60_DOCK_HANDLER, GsSetIds.P60_CORE) pack_generic_hk_listening_cmds( tc_queue=tc_queue, proc_key=key, sid=sid, diag=False ) if op_code in OpCodes.CORE_FT: key = KAI.CORE_FT[0] sid = make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK) pack_generic_hk_listening_cmds( tc_queue=tc_queue, proc_key=key, sid=sid, diag=False ) def listen_to_hk_for_x_seconds( tc_queue: TcQueueT, diag: bool, device: str, sid: bytes, interval_seconds: float, collection_time: float, ): """ enable_periodic_hk_command_with_interval( diag: bool, sid: bytes, interval_seconds: float, ssc: int """ """ function with periodic HK generation interval_seconds = at which rate HK is saved collection_time = how long the HK is saved for device = for which device the HK is saved functional_test = diagnostic Hk = yes diagnostic or no diagnostic sid = structural ID for specific device device Hk set ID """ tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}")) cmd_tuple = enable_periodic_hk_command_with_interval( diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0 ) for cmd in cmd_tuple: tc_queue.appendleft(cmd.pack_command_tuple()) tc_queue.appendleft((QueueCommands.WAIT, collection_time)) tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}")) tc_queue.appendleft( disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple() )