fsfw-example-tmtc-common/pus_tc/tc_packing.py

81 lines
3.4 KiB
Python
Raw Normal View History

2021-07-14 00:28:33 +02:00
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
2022-05-18 23:40:13 +02:00
import logging
2021-07-14 00:28:33 +02:00
import os
from collections import deque
from typing import Union
2022-05-18 23:40:13 +02:00
from spacepackets.ecss.tc import PusTelecommand
2022-05-20 11:08:46 +02:00
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
2022-05-18 23:40:13 +02:00
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.logging import get_console_logger, get_current_time_string
from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config.definitions import CoreServiceList, QueueCommands
from tmtccmd.tc.pus_5_event import pack_generic_service5_test_into
from tmtccmd.pus.pus_17_test import pack_generic_service17_test
2021-07-14 00:28:33 +02:00
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into
2022-05-20 11:08:46 +02:00
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
2021-07-14 00:28:33 +02:00
from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into
2022-05-20 11:08:46 +02:00
from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into
2021-07-14 00:28:33 +02:00
LOGGER = get_console_logger()
2022-05-18 23:40:13 +02:00
def pre_tc_send_cb(
queue_entry: Union[bytes, QueueCommands],
com_if: CommunicationInterface,
queue_info: Union[PusTelecommand, any],
file_logger: logging.Logger,
):
if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray):
log_raw_pus_tc(
packet=queue_entry,
srv_subservice=(queue_info.service, queue_info.subservice),
)
tc_info_string = f"Sent {queue_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=queue_entry)
elif isinstance(queue_entry, QueueCommands):
if queue_entry == QueueCommands.PRINT:
file_logger.info(queue_info)
2022-05-20 11:08:46 +02:00
def common_service_queue_user(
service: Union[str, int], op_code: str, tc_queue: TcQueueT
):
2021-07-14 00:28:33 +02:00
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue)
2022-05-20 11:08:46 +02:00
if service == CoreServiceList.SERVICE_11.value:
return pack_service_11_commands(op_code=op_code, tc_queue=tc_queue)
2021-07-14 00:28:33 +02:00
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0)
if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(tc_queue=tc_queue, op_code=op_code)
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(tc_queue=tc_queue, op_code=op_code)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
pack_service_2_commands_into(op_code="0", tc_queue=tc_queue)
pack_generic_service17_test(init_ssc=1700, tc_queue=tc_queue)
return tc_queue