210 lines
8.2 KiB
Python
Raw Normal View History

2022-07-03 20:58:32 +02:00
import logging
2022-05-18 23:40:13 +02:00
import sys
2022-07-03 20:58:32 +02:00
from tmtccmd.com_if import ComInterface
from tmtccmd.logging import get_current_time_string
2022-07-27 20:43:52 +02:00
from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices
2022-07-27 14:40:25 +02:00
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
2022-05-18 23:40:13 +02:00
try:
import spacepackets
except ImportError as error:
print(error)
2022-07-03 20:58:32 +02:00
print("Python tmtccmd module could not be imported. Make sure it is installed")
2022-05-18 23:40:13 +02:00
sys.exit(1)
try:
2022-07-03 20:58:32 +02:00
import tmtccmd
2022-05-18 23:40:13 +02:00
except ImportError as error:
2022-07-03 20:58:32 +02:00
print(error)
print("Python tmtccmd module could not be imported. Make sure it is installed")
2022-05-18 23:40:13 +02:00
sys.exit(1)
2022-07-27 20:43:52 +02:00
from spacepackets.ecss import PusVerificator, PusTelecommand
2022-07-03 20:58:32 +02:00
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into
from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into
from examples.tmtcc import EXAMPLE_APID
from tmtccmd import TcHandlerBase, get_console_logger, TmTcCfgHookBase, BackendBase
2022-07-27 14:40:25 +02:00
from tmtccmd.pus import VerificationWrapper
2022-07-03 20:58:32 +02:00
from tmtccmd.tc import (
ProcedureHelper,
FeedWrapper,
TcProcedureType,
QueueEntryHelper,
2022-07-27 20:43:52 +02:00
TcQueueEntryType,
SendCbParams,
2022-07-03 20:58:32 +02:00
)
from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into
from tmtccmd.tm import SpecificApidHandlerBase, CcsdsTmHandler
from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
from tmtccmd.config import CoreServiceList, SetupWrapper, SetupParams, ArgParserWrapper
2022-05-18 23:40:13 +02:00
from common_tmtc.config import __version__
2022-07-03 20:58:32 +02:00
from common_tmtc.pus_tm.factory_hook import pus_factory_hook
LOGGER = get_console_logger()
class PusHandler(SpecificApidHandlerBase):
def __init__(
self,
wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__(EXAMPLE_APID, None)
self.printer = printer
self.verif_wrapper = wrapper
self.raw_logger = raw_logger
def handle_tm(self, packet: bytes, _user_args: any):
pus_factory_hook(
packet=packet,
wrapper=self.verif_wrapper,
raw_logger=self.raw_logger,
printer=self.printer,
)
2022-05-18 23:40:13 +02:00
2022-07-03 20:58:32 +02:00
class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
pus_verificator: PusVerificator,
file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__()
self.seq_count_provider = seq_count_provider
self.pus_verificator = pus_verificator
self.file_logger = file_logger
self.raw_logger = raw_logger
2022-05-18 23:40:13 +02:00
2022-07-03 20:58:32 +02:00
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
service = def_proc.service
op_code = def_proc.op_code
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(
op_code=op_code, q=wrapper.queue_helper
)
if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(
op_code=op_code, q=wrapper.queue_helper
)
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service_5_test_into(q=wrapper.queue_helper)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(
op_code=op_code, q=wrapper.queue_helper
)
if service == CoreServiceList.SERVICE_11.value:
return pack_service_11_commands(op_code=op_code, q=wrapper.queue_helper)
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, q=wrapper.queue_helper)
if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(
q=wrapper.queue_helper, op_code=op_code
)
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(
q=wrapper.queue_helper, op_code=op_code
)
LOGGER.warning("Invalid Service !")
2022-05-18 23:40:13 +02:00
2022-07-27 14:40:25 +02:00
def send_cb(self, params: SendCbParams):
if params.entry.is_tc:
if params.entry.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = params.entry.to_pus_tc_entry()
2022-07-27 20:43:52 +02:00
# TODO: All of this stuff should be done during queue insertion time
# This requires the queue helper to be optionally able to perform TC
# post-processing via a callback or something similar. Then an API can be
# added which is also able to pack time tagged TCs and stamping both TCs
2022-07-03 20:58:32 +02:00
pus_tc_wrapper.pus_tc.seq_count = (
2022-07-27 14:40:25 +02:00
self.seq_count_provider.get_and_increment()
2022-07-03 20:58:32 +02:00
)
pus_tc_wrapper.pus_tc.apid = EXAMPLE_APID
2022-07-27 20:43:52 +02:00
if (
pus_tc_wrapper.pus_tc.service == 11
and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT
):
try:
pus_tc = PusTelecommand.unpack(
pus_tc_wrapper.pus_tc.app_data[4:]
)
self.pus_verificator.add_tc(pus_tc)
except ValueError as e:
LOGGER.warning(
f"Attempt of unpacking time tagged TC failed with exception {e}"
)
2022-07-03 20:58:32 +02:00
# Add TC after Sequence Count stamping
self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc)
raw_tc = pus_tc_wrapper.pus_tc.pack()
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}"
LOGGER.info(tc_info_string)
self.file_logger.info(
f"{get_current_time_string(True)}: {tc_info_string}"
)
2022-07-27 14:40:25 +02:00
params.com_if.send(raw_tc)
2022-07-03 20:58:32 +02:00
def queue_finished_cb(self, info: ProcedureHelper):
if info is not None and info.proc_type == TcQueueEntryType.PUS_TC:
def_proc = info.to_def_procedure()
LOGGER.info(
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}"
)
def setup_params(hook_obj: TmTcCfgHookBase) -> SetupWrapper:
print(f"-- eive TMTC Commander --")
2022-07-27 20:43:52 +02:00
print(f"-- spacepackets v{spacepackets.__version__} --")
2022-07-03 20:58:32 +02:00
params = SetupParams()
parser_wrapper = ArgParserWrapper(hook_obj)
parser_wrapper.parse()
tmtccmd.init_printout(parser_wrapper.use_gui)
parser_wrapper.set_params(params)
params.apid = EXAMPLE_APID
setup_wrapper = SetupWrapper(hook_obj=hook_obj, setup_params=params)
return setup_wrapper
def setup_tmtc_handlers(
verif_wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
) -> (CcsdsTmHandler, TcHandler):
pus_handler = PusHandler(
printer=printer, raw_logger=raw_logger, wrapper=verif_wrapper
)
ccsds_handler = CcsdsTmHandler(None)
ccsds_handler.add_apid_handler(pus_handler)
tc_handler = TcHandler(
file_logger=printer.file_logger,
raw_logger=raw_logger,
pus_verificator=verif_wrapper.pus_verificator,
2022-07-27 14:40:25 +02:00
seq_count_provider=PusFileSeqCountProvider(),
2022-07-03 20:58:32 +02:00
)
return ccsds_handler, tc_handler
def setup_backend(
setup_wrapper: SetupWrapper,
tc_handler: TcHandler,
ccsds_handler: CcsdsTmHandler,
) -> BackendBase:
2022-05-18 23:40:13 +02:00
tmtc_backend = tmtccmd.create_default_tmtc_backend(
2022-07-03 20:58:32 +02:00
setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler
2022-05-18 23:40:13 +02:00
)
2022-07-03 20:58:32 +02:00
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj)
return tmtc_backend