"""Hook function which packs telecommands based on service and operation code string """ import logging import os from collections import deque from typing import Union from spacepackets.ecss import PusTelecommand from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.config.definitions import CoreServiceList, QueueCommands from tmtccmd.logging import get_console_logger from tmtccmd.logging.pus import log_raw_pus_tc from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.pus_5_event import pack_generic_service5_test_into from tmtccmd.pus.pus_17_test import pack_service_17_ping_command from tmtccmd.logging import get_current_time_string from pus_tc.service_200_mode import pack_service200_test_into from pus_tc.devs.p60dock import pack_p60dock_cmds from pus_tc.devs.pdu2 import pack_pdu2_commands from pus_tc.devs.pdu1 import pack_pdu1_commands from pus_tc.devs.bpx_batt import pack_bpx_commands from pus_tc.devs.acu import pack_acu_test_into from pus_tc.devs.solar_array_deployment import pack_solar_array_deployment_test_into from pus_tc.devs.imtq import pack_imtq_test_into from pus_tc.devs.tmp1075 import pack_tmp1075_test_into from pus_tc.devs.ploc_mpsoc import pack_ploc_mpsoc_commands from pus_tc.devs.ploc_supervisor import pack_ploc_supv_commands from pus_tc.devs.heater import pack_heater_cmds from pus_tc.devs.reaction_wheels import pack_single_rw_test_into, pack_rw_ass_cmds from pus_tc.devs.rad_sensor import pack_rad_sensor_test_into from pus_tc.devs.ploc_memory_dumper import pack_ploc_memory_dumper_cmd from pus_tc.devs.ccsds_handler import pack_ccsds_handler_test from pus_tc.system.core import pack_core_commands from pus_tc.devs.star_tracker import pack_star_tracker_commands from pus_tc.devs.syrlinks_hk_handler import pack_syrlinks_command from pus_tc.devs.gps import pack_gps_command from pus_tc.system.time import pack_set_current_time_ascii_command from pus_tc.system.acs import pack_acs_command, pack_sus_cmds from pus_tc.devs.plpcdu import pack_pl_pcdu_commands from pus_tc.devs.str_img_helper import pack_str_img_helper_command from pus_tc.system.tcs import pack_tcs_sys_commands from pus_tc.system.proc import pack_proc_commands from pus_tc.system.controllers import pack_controller_commands from config.definitions import CustomServiceList from config.object_ids import ( P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID, CCSDS_HANDLER_ID, PDEC_HANDLER_ID, STR_IMG_HELPER_ID, SYRLINKS_HANDLER_ID, SOLAR_ARRAY_DEPLOYMENT_ID, RW_ASSEMBLY, ) LOGGER = get_console_logger() def pre_tc_send_cb( queue_entry: Union[bytes, QueueCommands], com_if: CommunicationInterface, queue_info: Union[PusTelecommand, any], file_logger: logging.Logger, ): if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray): log_raw_pus_tc( packet=queue_entry, srv_subservice=(queue_info.service, queue_info.subservice), ) tc_info_string = f"Sent {queue_info}" LOGGER.info(tc_info_string) file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}") com_if.send(data=queue_entry) elif isinstance(queue_entry, QueueCommands): if queue_entry == QueueCommands.PRINT: file_logger.info(queue_info) def pack_service_queue_user( service: Union[str, int], op_code: str, service_queue: TcQueueT ): if service == CoreServiceList.SERVICE_5.value: return pack_generic_service5_test_into(tc_queue=service_queue) if service == CoreServiceList.SERVICE_17.value: return service_queue.appendleft( pack_service_17_ping_command(ssc=1700).pack_command_tuple() ) if service == CoreServiceList.SERVICE_200.value: return pack_service200_test_into(tc_queue=service_queue) if service == CustomServiceList.P60DOCK.value: object_id = P60_DOCK_HANDLER return pack_p60dock_cmds( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDU1.value: object_id = PDU_1_HANDLER_ID return pack_pdu1_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDU2.value: object_id = PDU_2_HANDLER_ID return pack_pdu2_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.ACU.value: object_id = ACU_HANDLER_ID return pack_acu_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.BPX_BATTERY.value: return pack_bpx_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.TMP1075_1.value: object_id = TMP_1075_1_HANDLER_ID return pack_tmp1075_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.TMP1075_2.value: object_id = TMP_1075_2_HANDLER_ID return pack_tmp1075_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.HEATER.value: object_id = HEATER_ID return pack_heater_cmds( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.IMTQ.value: object_id = IMTQ_HANDLER_ID return pack_imtq_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PLOC_MPSOC.value: object_id = PLOC_MPSOC_ID return pack_ploc_mpsoc_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_1.value: return pack_single_rw_test_into( object_id=RW1_ID, rw_idx=1, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_2.value: return pack_single_rw_test_into( object_id=RW2_ID, rw_idx=2, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_3.value: return pack_single_rw_test_into( object_id=RW3_ID, rw_idx=3, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.REACTION_WHEEL_4.value: return pack_single_rw_test_into( object_id=RW4_ID, rw_idx=4, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.RAD_SENSOR.value: object_id = RAD_SENSOR_ID return pack_rad_sensor_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PLOC_SUPV.value: object_id = PLOC_SUPV_ID return pack_ploc_supv_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.STAR_TRACKER.value: object_id = STAR_TRACKER_ID return pack_star_tracker_commands( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.STR_IMG_HELPER.value: object_id = STR_IMG_HELPER_ID return pack_str_img_helper_command( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.CORE.value: return pack_core_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PLOC_MEMORY_DUMPER.value: object_id = PLOC_MEMORY_DUMPER_ID return pack_ploc_memory_dumper_cmd( object_id=object_id, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.ACS.value: return pack_acs_command(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.GPS_0.value: return pack_gps_command( object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.GPS_1.value: return pack_gps_command( object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.CCSDS_HANDLER.value: return pack_ccsds_handler_test( object_id=CCSDS_HANDLER_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.PDEC_HANDLER.value: return pack_ccsds_handler_test( object_id=PDEC_HANDLER_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.SYRLINKS.value: return pack_syrlinks_command( object_id=SYRLINKS_HANDLER_ID, tc_queue=service_queue, op_code=op_code ) if service == CustomServiceList.SA_DEPLYOMENT.value: return pack_solar_array_deployment_test_into( object_id=SOLAR_ARRAY_DEPLOYMENT_ID, tc_queue=service_queue ) if service == CustomServiceList.PROCEDURE.value: return pack_proc_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.SUS_ASS.value: return pack_sus_cmds(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.PL_PCDU.value: return pack_pl_pcdu_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.ACS_ASS.value: return pack_acs_command(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.TCS_ASS.value: return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.TIME.value: return pack_set_current_time_ascii_command(tc_queue=service_queue, ssc=0) if service == CustomServiceList.RW_ASSEMBLY.value: return pack_rw_ass_cmds( tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code ) if service == CustomServiceList.CONTROLLERS.value: return pack_controller_commands(tc_queue=service_queue, op_code=op_code) LOGGER.warning("Invalid Service !") def create_total_tc_queue_user() -> TcQueueT: if not os.path.exists("log"): os.mkdir("log") tc_queue = deque() pack_generic_service5_test_into(tc_queue) tc_queue.appendleft(pack_service_17_ping_command(ssc=1700).pack_command_tuple()) return tc_queue