220 lines
7.6 KiB
Python
220 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
import logging
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
from spacepackets.ecss import PusVerificator
|
|
from tmtccmd import get_console_logger, TcHandlerBase, BackendBase
|
|
from tmtccmd.com_if import ComInterface
|
|
|
|
from deps.tmtccmd.tmtccmd.logging.pus import (
|
|
RawTmtcTimedLogWrapper,
|
|
RegularTmtcLogWrapper,
|
|
TimedLogWhen,
|
|
)
|
|
from deps.tmtccmd.tmtccmd.pus import VerificationWrapper
|
|
from deps.tmtccmd.tmtccmd.tm import SpecificApidHandlerBase, GenericApidHandlerBase
|
|
from deps.tmtccmd.tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
|
from tmtccmd.core import BackendRequest
|
|
from tmtccmd.logging import get_current_time_string
|
|
from tmtccmd.pus import FileSeqCountProvider
|
|
from tmtccmd.tc import (
|
|
ProcedureHelper,
|
|
FeedWrapper,
|
|
TcProcedureType,
|
|
QueueEntryHelper,
|
|
TcQueueEntryType,
|
|
)
|
|
|
|
try:
|
|
import spacepackets
|
|
except ImportError as error:
|
|
print(error)
|
|
print("Python spacepackets module could not be imported")
|
|
print(
|
|
'Install with "cd spacepackets && python3 -m pip intall -e ." for interative installation'
|
|
)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
import tmtccmd.runner as tmtccmd
|
|
from tmtccmd.logging.pus import create_tmtc_logger
|
|
from tmtccmd.ccsds.handler import ApidHandler, CcsdsTmHandler
|
|
from tmtccmd.config import SetupArgs, default_json_path, SetupWrapper
|
|
from tmtccmd.config.args import (
|
|
create_default_args_parser,
|
|
add_default_tmtccmd_args,
|
|
parse_default_input_arguments,
|
|
SetupParams,
|
|
ArgParserWrapper,
|
|
)
|
|
except ImportError as error:
|
|
run_tmtc_commander = None
|
|
initialize_tmtc_commander = None
|
|
tb = traceback.format_exc()
|
|
print(tb)
|
|
print("Python tmtccmd submodule could not be imported")
|
|
sys.exit(1)
|
|
|
|
from config import __version__
|
|
from config.definitions import PUS_APID
|
|
from config.hook_implementations import EiveHookObject
|
|
from pus_tm.factory_hook import pus_factory_hook
|
|
from pus_tc.procedure_packer import handle_default_procedure
|
|
|
|
LOGGER = get_console_logger()
|
|
|
|
|
|
class PusHandler(SpecificApidHandlerBase):
|
|
def __init__(
|
|
self,
|
|
wrapper: VerificationWrapper,
|
|
printer: FsfwTmTcPrinter,
|
|
raw_logger: RawTmtcTimedLogWrapper,
|
|
):
|
|
super().__init__(PUS_APID, None)
|
|
self.printer = printer
|
|
self.verif_wrapper = wrapper
|
|
self.raw_logger = raw_logger
|
|
|
|
def handle_tm(self, packet: bytes, _user_args: any):
|
|
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger)
|
|
|
|
|
|
class UnknownApidHandler(GenericApidHandlerBase):
|
|
def handle_tm(self, apid: int, _packet: bytes, _user_args: any):
|
|
LOGGER.warning(f"Packet with unknwon APID {apid} detected")
|
|
|
|
|
|
class TcHandler(TcHandlerBase):
|
|
def __init__(
|
|
self,
|
|
seq_count_provider: FileSeqCountProvider,
|
|
pus_verificator: PusVerificator,
|
|
file_logger: logging.Logger,
|
|
raw_logger: RawTmtcTimedLogWrapper,
|
|
):
|
|
super().__init__()
|
|
self.seq_count_provider = seq_count_provider
|
|
self.pus_verificator = pus_verificator
|
|
self.file_logger = file_logger
|
|
self.raw_logger = raw_logger
|
|
|
|
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
|
|
if info.proc_type == TcProcedureType.DEFAULT:
|
|
handle_default_procedure(info.to_def_procedure(), wrapper)
|
|
|
|
def send_cb(self, entry_helper: QueueEntryHelper, com_if: ComInterface):
|
|
if entry_helper.is_tc:
|
|
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
|
|
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
|
|
pus_tc_wrapper.pus_tc.seq_count = (
|
|
self.seq_count_provider.next_seq_count()
|
|
)
|
|
pus_tc_wrapper.pus_tc.apid = PUS_APID
|
|
# Add TC after Sequence Count stamping
|
|
self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc)
|
|
raw_tc = pus_tc_wrapper.pus_tc.pack()
|
|
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
|
|
tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}"
|
|
LOGGER.info(tc_info_string)
|
|
self.file_logger.info(
|
|
f"{get_current_time_string(True)}: {tc_info_string}"
|
|
)
|
|
com_if.send(raw_tc)
|
|
elif entry_helper.entry_type == TcQueueEntryType.LOG:
|
|
log_entry = entry_helper.to_log_entry()
|
|
LOGGER.info(log_entry.log_str)
|
|
self.file_logger.info(log_entry.log_str)
|
|
|
|
def queue_finished_cb(self, info: ProcedureHelper):
|
|
if info is not None and info.proc_type == TcQueueEntryType.PUS_TC:
|
|
def_proc = info.to_def_procedure()
|
|
LOGGER.info(
|
|
f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}"
|
|
)
|
|
|
|
|
|
def setup_params() -> SetupWrapper:
|
|
print(f"-- eive tmtc v{__version__} --")
|
|
print(f"-- spacepackets v{spacepackets.__version__} --")
|
|
hook_obj = EiveHookObject(default_json_path())
|
|
params = SetupParams()
|
|
parser_wrapper = ArgParserWrapper(hook_obj)
|
|
parser_wrapper.parse()
|
|
tmtccmd.init_printout(parser_wrapper.use_gui)
|
|
parser_wrapper.set_params(params)
|
|
params.apid = PUS_APID
|
|
setup_wrapper = SetupWrapper(hook_obj=hook_obj, setup_params=params)
|
|
return setup_wrapper
|
|
|
|
|
|
def setup_tmtc(
|
|
verificator: PusVerificator,
|
|
printer: FsfwTmTcPrinter,
|
|
raw_logger: RawTmtcTimedLogWrapper,
|
|
) -> (CcsdsTmHandler, TcHandler):
|
|
verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger)
|
|
pus_handler = PusHandler(verification_wrapper, printer, raw_logger)
|
|
ccsds_handler = CcsdsTmHandler(generic_handler=UnknownApidHandler(None))
|
|
ccsds_handler.add_apid_handler(pus_handler)
|
|
seq_count_provider = FileSeqCountProvider()
|
|
tc_handler = TcHandler(
|
|
seq_count_provider=seq_count_provider,
|
|
pus_verificator=verificator,
|
|
file_logger=printer.file_logger,
|
|
raw_logger=raw_logger,
|
|
)
|
|
return ccsds_handler, tc_handler
|
|
|
|
|
|
def setup_backend(
|
|
setup_wrapper: SetupWrapper,
|
|
tc_handler: TcHandler,
|
|
ccsds_handler: CcsdsTmHandler,
|
|
) -> BackendBase:
|
|
tmtc_backend = tmtccmd.create_default_tmtc_backend(
|
|
setup_wrapper=setup_wrapper, tm_handler=ccsds_handler, tc_handler=tc_handler
|
|
)
|
|
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=setup_wrapper.hook_obj)
|
|
return tmtc_backend
|
|
|
|
|
|
def main():
|
|
try:
|
|
setup_wrapper = setup_params()
|
|
except KeyboardInterrupt as e:
|
|
LOGGER.info(f"{e}. Exiting")
|
|
sys.exit(0)
|
|
tmtc_logger = RegularTmtcLogWrapper()
|
|
printer = FsfwTmTcPrinter(tmtc_logger.logger)
|
|
raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=2)
|
|
pus_verificator = PusVerificator()
|
|
ccsds_handler, tc_handler = setup_tmtc(pus_verificator, printer, raw_logger)
|
|
|
|
tmtccmd.setup(setup_wrapper)
|
|
tmtc_backend = setup_backend(
|
|
setup_wrapper=setup_wrapper, ccsds_handler=ccsds_handler, tc_handler=tc_handler
|
|
)
|
|
try:
|
|
while True:
|
|
state = tmtc_backend.periodic_op(None)
|
|
if state.request == BackendRequest.TERMINATION_NO_ERROR:
|
|
sys.exit(0)
|
|
elif state.request == BackendRequest.DELAY_IDLE:
|
|
LOGGER.info("TMTC Client in IDLE mode")
|
|
time.sleep(3.0)
|
|
elif state.request == BackendRequest.DELAY_LISTENER:
|
|
time.sleep(0.8)
|
|
elif state.request == BackendRequest.DELAY_CUSTOM:
|
|
time.sleep(state.next_delay.total_seconds())
|
|
elif state.request == BackendRequest.CALL_NEXT:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|