eive-tmtc/pus_tc/tc_packer_hook.py

73 lines
3.1 KiB
Python

"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_test_into
from pus_tc.pdu1 import pack_pdu1_test_into
from pus_tc.acu import pack_acu_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID
LOGGER = get_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=service_queue)
if service == CoreServiceList.SERVICE_17.value:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
if service == CustomServiceList.P60DOCK.value:
object_id = P60_DOCK_HANDLER
return pack_p60dock_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.PDU1.value:
pdu1_object_id = PDU_1_HANDLER_ID
p60dock_object_id = P60_DOCK_HANDLER
return pack_pdu1_test_into(
pdu1_object_id=pdu1_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue
)
if service == CustomServiceList.PDU2.value:
pdu2_object_id = PDU_2_HANDLER_ID
p60dock_object_id = P60_DOCK_HANDLER
return pack_pdu2_test_into(
pdu2_object_id=pdu2_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue
)
if service == CustomServiceList.ACU.value:
object_id = ACU_HANDLER_ID
return pack_acu_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.TMP1075_1.value:
object_id = TMP_1075_1_HANDLER_ID
return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TMP1075_2.value:
object_id = TMP_1075_2_HANDLER_ID
return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.HEATER.value:
object_id = HEATER_ID
return pack_heater_test_into(object_id=object_id, tc_queue=service_queue)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_generic_service5_test_into(tc_queue)
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue