from gomspace.gomspace_common import GsInfo, GomspaceOpCodes from tmtc.power.common_power import ( PowerOpCodes, PowerInfo, add_gomspace_cmd_defs, pack_reset_gnd_wdt_cmd, ) from config.definitions import CustomServiceList from config.object_ids import ( P60_DOCK_HANDLER, ACU_HANDLER_ID, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, get_object_ids, ) from tmtc.power.pdu1 import pdu1_req_hk_cmds, pdu1_cmds, add_pdu1_common_defs from tmtc.power.pdu2 import pdu2_req_hk_cmds, add_pdu2_common_defs, pdu2_cmds from tmtccmd import get_console_logger from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtc.power.p60dock import P60OpCodes, P60Info, p60_dock_req_hk_cmds from tmtc.power.acu import add_acu_cmds, acu_req_hk_cmds from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.tc import DefaultPusQueueHelper LOGGER = get_console_logger() def pack_power_commands(q: DefaultPusQueueHelper, op_code: str): pdu1_cmds(q, op_code) pdu2_cmds(q, op_code) if op_code in PowerOpCodes.INFO_CORE: pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) q.add_wait_seconds(8.0) elif op_code in PowerOpCodes.INFO_AUX: pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) q.add_wait_seconds(8.0) elif op_code in PowerOpCodes.INFO_ALL: pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) pdu1_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) pdu2_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) p60_dock_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_CORE_HK_ONCE[0]) acu_req_hk_cmds(q, GomspaceOpCodes.REQUEST_AUX_HK_ONCE[0]) q.add_wait_seconds(8.0) elif op_code in PowerOpCodes.RESET_ALL_GND_WDTS: oids = get_object_ids() pack_reset_gnd_wdt_cmd(q, "P60 Dock", oids[P60_DOCK_HANDLER]) pack_reset_gnd_wdt_cmd(q, "ACU", oids[ACU_HANDLER_ID]) pack_reset_gnd_wdt_cmd(q, "PDU1", oids[PDU_1_HANDLER_ID]) pack_reset_gnd_wdt_cmd(q, "PDU2", oids[PDU_2_HANDLER_ID]) if q.empty(): LOGGER.info(f"Queue is empty, no stack for op code {op_code}") @tmtc_definitions_provider def add_p60_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=P60OpCodes.STACK_3V3_ON, info=P60Info.STACK_3V3_ON) oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=P60Info.STACK_3V3_OFF) oce.add(keys=P60OpCodes.STACK_5V_ON, info=P60Info.STACK_5V_ON) oce.add(keys=P60OpCodes.STACK_5V_OFF, info=P60Info.STACK_5V_OFF) add_gomspace_cmd_defs(oce) oce.add(keys=P60OpCodes.TEST, info="P60 Tests") defs.add_service( name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce ) @tmtc_definitions_provider def add_power_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() add_pdu1_common_defs(oce) add_pdu2_common_defs(oce) oce.add(keys=PowerOpCodes.INFO_ALL, info=PowerInfo.INFO_ALL) oce.add(keys=PowerOpCodes.INFO_CORE, info=PowerInfo.INFO_CORE) oce.add(keys=PowerOpCodes.INFO_AUX, info=PowerInfo.INFO_AUX) oce.add(keys=PowerOpCodes.RESET_ALL_GND_WDTS, info=PowerInfo.RESET_ALL_GND_WDTS) defs.add_service( name=CustomServiceList.POWER.value, info="Power Subsystem", op_code_entry=oce, ) @tmtc_definitions_provider def add_pdu1_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() add_pdu1_common_defs(oce) add_gomspace_cmd_defs(oce) oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE) oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE) oce.add( keys=GomspaceOpCodes.PRINT_SWITCH_V_I, info="PDU1: Print Switches, Voltages, Currents", ) oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER) defs.add_service( name=CustomServiceList.PDU1.value, info="PDU1 Device", op_code_entry=oce, ) @tmtc_definitions_provider def add_pdu2_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() add_pdu2_common_defs(oce) add_gomspace_cmd_defs(oce) oce.add( keys=GomspaceOpCodes.PRINT_SWITCH_V_I, info="PDU2: Print Switches, Voltages, Currents", ) oce.add( keys=GomspaceOpCodes.PRINT_LATCHUPS, info="PDU2: Print Latchups", ) defs.add_service( name="pdu2", info="PDU2 Device", op_code_entry=oce, ) def add_pcdu_cmds(defs: TmtcDefinitionWrapper): add_p60_cmds(defs) add_pdu1_cmds(defs) add_pdu2_cmds(defs) add_acu_cmds(defs)