""" @brief This file transfers control of TC packing to the user @details Template configuration file. Copy this folder to the TMTC commander root and adapt it to your needs. """ import os from collections import deque from typing import Union from tmtccmd.config.definitions import CoreServiceList from tmtccmd.utility.logger import get_console_logger from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.service_5_event import pack_generic_service5_test_into from tmtccmd.tc.service_17_test import pack_service17_ping_command from pus_tc.service_200_mode import pack_service200_test_into from pus_tc.p60dock import pack_p60dock_test_into from pus_tc.pdu2 import pack_pdu2_test_into from pus_tc.pdu1 import pack_pdu1_test_into from pus_tc.acu import pack_acu_test_into from pus_tc.imtq import pack_imtq_test_into from pus_tc.tmp1075 import pack_tmp1075_test_into from pus_tc.ploc_mpsoc import pack_ploc_mpsoc_test_into from pus_tc.ploc_supervisor import pack_ploc_supv_test_into from pus_tc.heater import pack_heater_test_into from pus_tc.reaction_wheels import pack_single_rw_test_into from pus_tc.rad_sensor import pack_rad_sensor_test_into from pus_tc.core import pack_core_commands from config.definitions import CustomServiceList from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ RAD_SENSOR_ID, PLOC_SUPV_ID LOGGER = get_console_logger() def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT): if service == CoreServiceList.SERVICE_5.value: return pack_generic_service5_test_into(tc_queue=service_queue) if service == CoreServiceList.SERVICE_17.value: return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple()) if service == CoreServiceList.SERVICE_200.value: return pack_service200_test_into(tc_queue=service_queue) if service == CustomServiceList.P60DOCK.value: object_id = P60_DOCK_HANDLER return pack_p60dock_test_into(object_id=object_id, tc_queue=service_queue) if service == CustomServiceList.PDU1.value: pdu1_object_id = PDU_1_HANDLER_ID p60dock_object_id = P60_DOCK_HANDLER return pack_pdu1_test_into( pdu1_object_id=pdu1_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue ) if service == CustomServiceList.PDU2.value: pdu2_object_id = PDU_2_HANDLER_ID p60dock_object_id = P60_DOCK_HANDLER return pack_pdu2_test_into( pdu2_object_id=pdu2_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue ) if service == CustomServiceList.ACU.value: object_id = ACU_HANDLER_ID return pack_acu_test_into(object_id=object_id, tc_queue=service_queue) if service == CustomServiceList.TMP1075_1.value: object_id = TMP_1075_1_HANDLER_ID return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.TMP1075_2.value: object_id = TMP_1075_2_HANDLER_ID return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.HEATER.value: object_id = HEATER_ID return pack_heater_test_into(object_id=object_id, tc_queue=service_queue) if service == CustomServiceList.IMTQ.value: object_id = IMTQ_HANDLER_ID return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PLOC.value: object_id = PLOC_MPSOC_ID return pack_ploc_mpsoc_test_into(object_id=object_id, tc_queue=service_queue) if service == CustomServiceList.REACTION_WHEEL_1.value: object_id = RW1_ID return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.REACTION_WHEEL_2.value: object_id = RW2_ID return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.REACTION_WHEEL_3.value: object_id = RW3_ID return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.REACTION_WHEEL_4.value: object_id = RW4_ID return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.RAD_SENSOR.value: object_id = RAD_SENSOR_ID return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PLOC_SUPV.value: object_id = PLOC_SUPV_ID return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.CORE.value: return pack_core_commands(tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !") def create_total_tc_queue_user() -> TcQueueT: if not os.path.exists("log"): os.mkdir("log") tc_queue = deque() pack_generic_service5_test_into(tc_queue) tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple()) return tc_queue