eive-tmtc/tmtcc.py

242 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python3
import logging
import sys
import time
import traceback
from tmtccmd.tc.handler import SendCbParams
try:
import spacepackets
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
print(
'Install with "cd spacepackets && python3 -m pip intall -e ." for interative installation'
)
sys.exit(1)
try:
import tmtccmd
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
tb = traceback.format_exc()
print(tb)
print("Python tmtccmd submodule could not be imported")
sys.exit(1)
from spacepackets.ecss import PusVerificator
from tmtccmd import get_console_logger, TcHandlerBase, BackendBase
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.logging.pus import (
RawTmtcTimedLogWrapper,
RegularTmtcLogWrapper,
TimedLogWhen,
)
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm import SpecificApidHandlerBase, GenericApidHandlerBase, CcsdsTmHandler
from tmtccmd.core import BackendRequest
from tmtccmd.logging import get_current_time_string
from tmtccmd.tc import (
ProcedureHelper,
FeedWrapper,
TcProcedureType,
TcQueueEntryType,
DefaultPusQueueHelper,
)
from tmtccmd.config import default_json_path, SetupWrapper
from tmtccmd.config.args import (
SetupParams,
ArgParserWrapper,
)
from config import __version__
from config.definitions import PUS_APID
from config.hook import EiveHookObject
from pus_tm.factory_hook import pus_factory_hook
from pus_tc.procedure_packer import handle_default_procedure
LOGGER = get_console_logger()
# Put rotating file logger parameters here for quick changes
ROTATING_TIMED_LOGGER_INTERVAL_WHEN = TimedLogWhen.PER_MINUTE
ROTATING_TIMED_LOGGER_INTERVAL = 30
class PusHandler(SpecificApidHandlerBase):
def __init__(
self,
wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__(PUS_APID, None)
self.printer = printer
self.verif_wrapper = wrapper
self.raw_logger = raw_logger
def handle_tm(self, packet: bytes, _user_args: any):
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger)
class UnknownApidHandler(GenericApidHandlerBase):
def handle_tm(self, apid: int, _packet: bytes, _user_args: any):
LOGGER.warning(f"Packet with unknwon APID {apid} detected")
class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
pus_verificator: PusVerificator,
file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper,
gui: bool,
):
super().__init__()
self.seq_count_provider = seq_count_provider
self.pus_verificator = pus_verificator
self.file_logger = file_logger
self.raw_logger = raw_logger
self.gui = gui
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None,
pus_apid=PUS_APID,
seq_cnt_provider=seq_count_provider,
pus_verificator=pus_verificator,
)
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT:
handle_default_procedure(
info.to_def_procedure(), self.queue_helper, self.gui
)
def send_cb(self, send_params: SendCbParams):
entry_helper = send_params.entry
if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
pus_tc_wrapper.pus_tc.seq_count = (
self.seq_count_provider.get_and_increment()
)
pus_tc_wrapper.pus_tc.apid = PUS_APID
# Add TC after Sequence Count stamping
self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc)
raw_tc = pus_tc_wrapper.pus_tc.pack()
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}"
LOGGER.info(tc_info_string)
self.file_logger.info(
f"{get_current_time_string(True)}: {tc_info_string}"
)
send_params.com_if.send(raw_tc)
elif entry_helper.entry_type == TcQueueEntryType.LOG:
log_entry = entry_helper.to_log_entry()
LOGGER.info(log_entry.log_str)
self.file_logger.info(log_entry.log_str)
def queue_finished_cb(self, info: ProcedureHelper):
if info is not None and info.proc_type == TcQueueEntryType.PUS_TC:
def_proc = info.to_def_procedure()
LOGGER.info(
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}"
)
def setup_params() -> SetupWrapper:
print(f"-- eive tmtc v{__version__} --")
print(f"-- spacepackets v{spacepackets.__version__} --")
hook_obj = EiveHookObject(default_json_path())
params = SetupParams()
parser_wrapper = ArgParserWrapper(hook_obj)
parser_wrapper.parse()
tmtccmd.init_printout(parser_wrapper.use_gui)
parser_wrapper.set_params(params)
params.apid = PUS_APID
setup_wrapper = SetupWrapper(hook_obj=hook_obj, setup_params=params)
return setup_wrapper
def setup_tmtc(
verificator: PusVerificator,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
gui: bool,
) -> (CcsdsTmHandler, TcHandler):
verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger)
pus_handler = PusHandler(verification_wrapper, printer, raw_logger)
ccsds_handler = CcsdsTmHandler(generic_handler=UnknownApidHandler(None))
ccsds_handler.add_apid_handler(pus_handler)
seq_count_provider = PusFileSeqCountProvider()
tc_handler = TcHandler(
seq_count_provider=seq_count_provider,
pus_verificator=verificator,
file_logger=printer.file_logger,
raw_logger=raw_logger,
gui=gui,
)
return ccsds_handler, tc_handler
def setup_backend(
setup_wrapper: SetupWrapper,
tc_handler: TcHandler,
ccsds_handler: CcsdsTmHandler,
) -> BackendBase:
tmtc_backend = tmtccmd.create_default_tmtc_backend(
setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler
)
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj)
return tmtc_backend
def main():
try:
setup_wrapper = setup_params()
except KeyboardInterrupt as e:
LOGGER.info(f"{e}. Exiting")
sys.exit(0)
tmtc_logger = RegularTmtcLogWrapper()
printer = FsfwTmTcPrinter(tmtc_logger.logger)
raw_logger = RawTmtcTimedLogWrapper(
when=ROTATING_TIMED_LOGGER_INTERVAL_WHEN,
interval=ROTATING_TIMED_LOGGER_INTERVAL
)
pus_verificator = PusVerificator()
ccsds_handler, tc_handler = setup_tmtc(
pus_verificator, printer, raw_logger, setup_wrapper.params.use_gui
)
tmtccmd.setup(setup_wrapper)
tmtc_backend = setup_backend(
setup_wrapper=setup_wrapper, ccsds_handler=ccsds_handler, tc_handler=tc_handler
)
try:
while True:
state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR:
sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE:
LOGGER.info("TMTC Client in IDLE mode")
time.sleep(3.0)
elif state.request == BackendRequest.DELAY_LISTENER:
time.sleep(0.5)
elif state.request == BackendRequest.DELAY_CUSTOM:
if state.next_delay.total_seconds() < 0.5:
time.sleep(state.next_delay.total_seconds())
else:
time.sleep(0.5)
elif state.request == BackendRequest.CALL_NEXT:
pass
except KeyboardInterrupt:
sys.exit(0)
if __name__ == "__main__":
main()