eive-tmtc/pus_tc/tc_packer_hook.py

73 lines
3.1 KiB
Python
Raw Normal View History

2020-12-17 17:50:00 +01:00
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
2021-03-19 17:39:52 +01:00
from typing import Union
2021-05-17 17:42:04 +02:00
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.utility.logger import get_logger
2021-04-10 22:59:51 +02:00
from tmtccmd.pus_tc.definitions import TcQueueT
2021-03-19 17:39:52 +01:00
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_test_into
from pus_tc.pdu1 import pack_pdu1_test_into
from pus_tc.acu import pack_acu_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
2021-05-17 17:42:04 +02:00
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID
2020-12-17 17:50:00 +01:00
LOGGER = get_logger()
2021-03-19 17:39:52 +01:00
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
2021-05-17 17:42:04 +02:00
if service == CoreServiceList.SERVICE_5.value:
2021-05-17 17:48:02 +02:00
return pack_generic_service5_test_into(tc_queue=service_queue)
2021-05-17 17:42:04 +02:00
if service == CoreServiceList.SERVICE_17.value:
2020-12-17 17:50:00 +01:00
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.P60DOCK.value:
2021-05-17 17:42:04 +02:00
object_id = P60_DOCK_HANDLER
2021-05-17 17:48:02 +02:00
return pack_p60dock_test_into(object_id=object_id, tc_queue=service_queue)
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.PDU1.value:
2021-05-17 17:42:04 +02:00
pdu1_object_id = PDU_1_HANDLER_ID
p60dock_object_id = P60_DOCK_HANDLER
2021-05-17 17:48:02 +02:00
return pack_pdu1_test_into(
pdu1_object_id=pdu1_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue
)
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.PDU2.value:
2021-05-17 17:42:04 +02:00
pdu2_object_id = PDU_2_HANDLER_ID
p60dock_object_id = P60_DOCK_HANDLER
2021-05-17 17:48:02 +02:00
return pack_pdu2_test_into(
pdu2_object_id=pdu2_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue
)
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.ACU.value:
2021-05-17 17:42:04 +02:00
object_id = ACU_HANDLER_ID
2021-05-17 17:48:02 +02:00
return pack_acu_test_into(object_id=object_id, tc_queue=service_queue)
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.TMP1075_1.value:
2021-05-17 17:42:04 +02:00
object_id = TMP_1075_1_HANDLER_ID
2021-05-17 17:48:02 +02:00
return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.TMP1075_2.value:
2021-05-17 17:42:04 +02:00
object_id = TMP_1075_2_HANDLER_ID
2021-05-17 17:48:02 +02:00
return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
2021-03-19 17:39:52 +01:00
if service == CustomServiceList.HEATER.value:
2021-05-17 17:42:04 +02:00
object_id = HEATER_ID
2021-05-17 17:48:02 +02:00
return pack_heater_test_into(object_id=object_id, tc_queue=service_queue)
2020-12-17 17:50:00 +01:00
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
2021-02-03 14:14:54 +01:00
pack_generic_service5_test_into(tc_queue)
2020-12-17 17:50:00 +01:00
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
return tc_queue