eive-tmtc/tmtcc.py

240 lines
8.0 KiB
Python
Raw Normal View History

2022-07-04 18:09:24 +02:00
#!/usr/bin/env python3
2022-07-04 15:22:53 +02:00
import logging
2022-05-17 11:13:32 +02:00
import sys
2022-07-04 18:09:24 +02:00
import time
2022-05-17 11:13:32 +02:00
import traceback
2022-07-04 15:22:53 +02:00
2022-07-14 15:45:57 +02:00
from tmtccmd.tc.handler import SendCbParams
2022-05-17 11:13:32 +02:00
try:
import spacepackets
except ImportError as error:
print(error)
print("Python spacepackets module could not be imported")
print(
'Install with "cd spacepackets && python3 -m pip intall -e ." for interative installation'
)
sys.exit(1)
try:
2022-07-04 18:14:51 +02:00
import tmtccmd
2022-05-17 11:13:32 +02:00
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
tb = traceback.format_exc()
print(tb)
print("Python tmtccmd submodule could not be imported")
sys.exit(1)
2022-07-04 18:14:51 +02:00
from spacepackets.ecss import PusVerificator
from tmtccmd import get_console_logger, TcHandlerBase, BackendBase
2022-07-13 10:42:48 +02:00
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
2022-07-04 18:14:51 +02:00
from tmtccmd.logging.pus import (
RawTmtcTimedLogWrapper,
RegularTmtcLogWrapper,
TimedLogWhen,
)
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm import SpecificApidHandlerBase, GenericApidHandlerBase, CcsdsTmHandler
from tmtccmd.core import BackendRequest
from tmtccmd.logging import get_current_time_string
from tmtccmd.tc import (
ProcedureHelper,
FeedWrapper,
TcProcedureType,
TcQueueEntryType,
2022-08-08 16:32:18 +02:00
DefaultPusQueueHelper,
2022-07-04 18:14:51 +02:00
)
from tmtccmd.config import default_json_path, SetupWrapper
from tmtccmd.config.args import (
SetupParams,
ArgParserWrapper,
)
2022-05-17 11:13:32 +02:00
from config import __version__
from config.definitions import PUS_APID
2022-07-05 02:12:54 +02:00
from config.hook import EiveHookObject
2022-07-04 15:22:53 +02:00
from pus_tm.factory_hook import pus_factory_hook
2022-07-04 18:09:24 +02:00
from pus_tc.procedure_packer import handle_default_procedure
2022-07-04 15:22:53 +02:00
LOGGER = get_console_logger()
# Put rotating file logger parameters here for quick changes
ROTATING_TIMED_LOGGER_INTERVAL_WHEN = TimedLogWhen.PER_MINUTE
ROTATING_TIMED_LOGGER_INTERVAL = 30
2022-07-04 15:22:53 +02:00
class PusHandler(SpecificApidHandlerBase):
def __init__(
self,
wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
):
super().__init__(PUS_APID, None)
self.printer = printer
self.verif_wrapper = wrapper
self.raw_logger = raw_logger
def handle_tm(self, packet: bytes, _user_args: any):
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger)
class UnknownApidHandler(GenericApidHandlerBase):
def handle_tm(self, apid: int, _packet: bytes, _user_args: any):
LOGGER.warning(f"Packet with unknwon APID {apid} detected")
class TcHandler(TcHandlerBase):
def __init__(
self,
seq_count_provider: FileSeqCountProvider,
pus_verificator: PusVerificator,
file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper,
2022-08-08 16:32:18 +02:00
gui: bool,
2022-07-04 15:22:53 +02:00
):
super().__init__()
self.seq_count_provider = seq_count_provider
self.pus_verificator = pus_verificator
self.file_logger = file_logger
self.raw_logger = raw_logger
2022-07-08 16:25:46 +02:00
self.gui = gui
2022-08-08 16:32:18 +02:00
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None,
pus_apid=PUS_APID,
seq_cnt_provider=seq_count_provider,
pus_verificator=pus_verificator,
)
2022-07-04 15:22:53 +02:00
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
2022-08-08 16:32:18 +02:00
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
2022-07-04 15:22:53 +02:00
if info.proc_type == TcProcedureType.DEFAULT:
2022-08-18 14:08:05 +02:00
handle_default_procedure(self, info.to_def_procedure(), self.queue_helper)
2022-05-17 11:13:32 +02:00
2022-07-14 15:45:57 +02:00
def send_cb(self, send_params: SendCbParams):
entry_helper = send_params.entry
2022-07-04 17:59:09 +02:00
if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
pus_tc_wrapper.pus_tc.seq_count = (
2022-07-08 16:25:46 +02:00
self.seq_count_provider.get_and_increment()
2022-07-04 17:59:09 +02:00
)
pus_tc_wrapper.pus_tc.apid = PUS_APID
# Add TC after Sequence Count stamping
self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc)
raw_tc = pus_tc_wrapper.pus_tc.pack()
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}"
LOGGER.info(tc_info_string)
self.file_logger.info(
f"{get_current_time_string(True)}: {tc_info_string}"
)
2022-07-14 15:45:57 +02:00
send_params.com_if.send(raw_tc)
2022-07-04 17:59:09 +02:00
elif entry_helper.entry_type == TcQueueEntryType.LOG:
log_entry = entry_helper.to_log_entry()
LOGGER.info(log_entry.log_str)
self.file_logger.info(log_entry.log_str)
2022-07-04 18:09:24 +02:00
def queue_finished_cb(self, info: ProcedureHelper):
if info is not None and info.proc_type == TcQueueEntryType.PUS_TC:
def_proc = info.to_def_procedure()
LOGGER.info(
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}"
)
2022-05-17 11:13:32 +02:00
2022-07-04 18:09:24 +02:00
def setup_params() -> SetupWrapper:
2022-05-17 11:13:32 +02:00
print(f"-- eive tmtc v{__version__} --")
print(f"-- spacepackets v{spacepackets.__version__} --")
2022-07-04 18:09:24 +02:00
hook_obj = EiveHookObject(default_json_path())
params = SetupParams()
parser_wrapper = ArgParserWrapper(hook_obj)
parser_wrapper.parse()
tmtccmd.init_printout(parser_wrapper.use_gui)
parser_wrapper.set_params(params)
params.apid = PUS_APID
setup_wrapper = SetupWrapper(hook_obj=hook_obj, setup_params=params)
return setup_wrapper
def setup_tmtc(
verificator: PusVerificator,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
2022-08-08 16:32:18 +02:00
gui: bool,
2022-07-04 18:09:24 +02:00
) -> (CcsdsTmHandler, TcHandler):
verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger)
pus_handler = PusHandler(verification_wrapper, printer, raw_logger)
ccsds_handler = CcsdsTmHandler(generic_handler=UnknownApidHandler(None))
ccsds_handler.add_apid_handler(pus_handler)
2022-07-08 16:25:46 +02:00
seq_count_provider = PusFileSeqCountProvider()
2022-07-04 18:09:24 +02:00
tc_handler = TcHandler(
seq_count_provider=seq_count_provider,
pus_verificator=verificator,
file_logger=printer.file_logger,
raw_logger=raw_logger,
2022-08-08 16:32:18 +02:00
gui=gui,
2022-07-04 18:09:24 +02:00
)
return ccsds_handler, tc_handler
2022-05-17 11:13:32 +02:00
2022-07-04 18:09:24 +02:00
def setup_backend(
setup_wrapper: SetupWrapper,
tc_handler: TcHandler,
ccsds_handler: CcsdsTmHandler,
) -> BackendBase:
2022-05-17 11:13:32 +02:00
tmtc_backend = tmtccmd.create_default_tmtc_backend(
2022-07-04 18:09:24 +02:00
setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler
)
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj)
return tmtc_backend
def main():
try:
setup_wrapper = setup_params()
except KeyboardInterrupt as e:
LOGGER.info(f"{e}. Exiting")
sys.exit(0)
tmtc_logger = RegularTmtcLogWrapper()
printer = FsfwTmTcPrinter(tmtc_logger.logger)
raw_logger = RawTmtcTimedLogWrapper(
when=ROTATING_TIMED_LOGGER_INTERVAL_WHEN,
2022-08-17 11:19:23 +02:00
interval=ROTATING_TIMED_LOGGER_INTERVAL,
)
2022-07-04 18:09:24 +02:00
pus_verificator = PusVerificator()
2022-07-08 16:25:46 +02:00
ccsds_handler, tc_handler = setup_tmtc(
pus_verificator, printer, raw_logger, setup_wrapper.params.use_gui
)
2022-07-04 18:09:24 +02:00
tmtccmd.setup(setup_wrapper)
tmtc_backend = setup_backend(
setup_wrapper=setup_wrapper, ccsds_handler=ccsds_handler, tc_handler=tc_handler
2022-05-17 11:13:32 +02:00
)
2022-07-04 18:09:24 +02:00
try:
while True:
state = tmtc_backend.periodic_op(None)
if state.request == BackendRequest.TERMINATION_NO_ERROR:
sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE:
LOGGER.info("TMTC Client in IDLE mode")
time.sleep(3.0)
elif state.request == BackendRequest.DELAY_LISTENER:
2022-08-11 18:10:15 +02:00
time.sleep(0.5)
2022-07-04 18:09:24 +02:00
elif state.request == BackendRequest.DELAY_CUSTOM:
2022-08-11 18:10:15 +02:00
if state.next_delay.total_seconds() < 0.5:
time.sleep(state.next_delay.total_seconds())
else:
time.sleep(0.5)
2022-07-04 18:09:24 +02:00
elif state.request == BackendRequest.CALL_NEXT:
pass
except KeyboardInterrupt:
sys.exit(0)
if __name__ == "__main__":
main()