from tmtc.power.common_power import ( PowerOpCodes, PowerInfo, add_gomspace_cmd_defs, pack_reset_gnd_wdt_cmd, ) from config.definitions import CustomServiceList from config.object_ids import ( P60_DOCK_HANDLER, ACU_HANDLER_ID, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, get_object_ids, ) from tmtc.power.pdu1 import ( pdu1_req_hk_cmds, pdu1_switch_cmds, add_pdu1_common_defs, add_pdu1_cmds, ) from tmtc.power.pdu2 import ( pdu2_req_hk_cmds, add_pdu2_common_defs, pdu2_cmds, add_pdu2_cmds, ) from tmtccmd import get_console_logger from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtc.power.p60dock import P60OpCodes, P60Info, p60_dock_req_hk_cmds from tmtc.power.acu import add_acu_cmds, acu_req_hk_cmds from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.tc import DefaultPusQueueHelper LOGGER = get_console_logger() def pack_power_commands(q: DefaultPusQueueHelper, op_code: str): pdu1_switch_cmds(q, op_code) pdu2_cmds(q, op_code) if op_code in PowerOpCodes.INFO_CORE: pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) acu_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) q.add_wait_seconds(8.0) elif op_code in PowerOpCodes.INFO_AUX: pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) acu_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) q.add_wait_seconds(8.0) elif op_code in PowerOpCodes.INFO_ALL: pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) p60_dock_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) acu_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0]) acu_req_hk_cmds(q, PowerOpCodes.REQUEST_AUX_HK_ONCE[0]) q.add_wait_seconds(8.0) elif op_code in PowerOpCodes.RESET_ALL_GND_WDTS: oids = get_object_ids() pack_reset_gnd_wdt_cmd(q, "P60 Dock", oids[P60_DOCK_HANDLER]) pack_reset_gnd_wdt_cmd(q, "ACU", oids[ACU_HANDLER_ID]) pack_reset_gnd_wdt_cmd(q, "PDU1", oids[PDU_1_HANDLER_ID]) pack_reset_gnd_wdt_cmd(q, "PDU2", oids[PDU_2_HANDLER_ID]) q.add_wait_seconds(5.0) if q.empty(): LOGGER.info(f"Queue is empty, no stack for op code {op_code}") @tmtc_definitions_provider def add_p60_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(keys=P60OpCodes.STACK_3V3_ON, info=P60Info.STACK_3V3_ON) oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=P60Info.STACK_3V3_OFF) oce.add(keys=P60OpCodes.STACK_5V_ON, info=P60Info.STACK_5V_ON) oce.add(keys=P60OpCodes.STACK_5V_OFF, info=P60Info.STACK_5V_OFF) add_gomspace_cmd_defs(oce) oce.add(keys=P60OpCodes.TEST, info="P60 Tests") defs.add_service( name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce ) @tmtc_definitions_provider def add_power_cmd_defs(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() add_pdu1_common_defs(oce) add_pdu2_common_defs(oce) oce.add(keys=PowerOpCodes.INFO_ALL, info=PowerInfo.INFO_ALL) oce.add(keys=PowerOpCodes.INFO_CORE, info=PowerInfo.INFO_CORE) oce.add(keys=PowerOpCodes.INFO_AUX, info=PowerInfo.INFO_AUX) oce.add(keys=PowerOpCodes.RESET_ALL_GND_WDTS, info=PowerInfo.RESET_ALL_GND_WDTS) defs.add_service( name=CustomServiceList.POWER.value, info="Power Subsystem", op_code_entry=oce, ) def add_pcdu_cmds(defs: TmtcDefinitionWrapper): add_p60_cmds(defs) add_pdu1_cmds(defs) add_pdu2_cmds(defs) add_acu_cmds(defs)