From 88161ed1257fba717cc8a23bdad2902b06af35c9 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 17 Aug 2023 11:33:42 +0200 Subject: [PATCH] more cleaning up, new cfdp module --- eive_tmtc/cfdp/__init__.py | 0 eive_tmtc/cfdp/fault_handler.py | 15 +++ eive_tmtc/cfdp/user.py | 52 +++++++++ eive_tmtc/pus_tc/tc_handler.py | 147 ++++++++++++++++++++++++ tmtcc.py | 198 +------------------------------- 5 files changed, 219 insertions(+), 193 deletions(-) create mode 100644 eive_tmtc/cfdp/__init__.py create mode 100644 eive_tmtc/cfdp/fault_handler.py create mode 100644 eive_tmtc/cfdp/user.py create mode 100644 eive_tmtc/pus_tc/tc_handler.py diff --git a/eive_tmtc/cfdp/__init__.py b/eive_tmtc/cfdp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eive_tmtc/cfdp/fault_handler.py b/eive_tmtc/cfdp/fault_handler.py new file mode 100644 index 0000000..baa061b --- /dev/null +++ b/eive_tmtc/cfdp/fault_handler.py @@ -0,0 +1,15 @@ +from tmtccmd.cfdp.mib import DefaultFaultHandlerBase + + +class EiveCfdpFaultHandler(DefaultFaultHandlerBase): + def notice_of_suspension_cb(self, cond: ConditionCode): + pass + + def notice_of_cancellation_cb(self, cond: ConditionCode): + pass + + def abandoned_cb(self, cond: ConditionCode): + pass + + def ignore_cb(self, cond: ConditionCode): + pass diff --git a/eive_tmtc/cfdp/user.py b/eive_tmtc/cfdp/user.py new file mode 100644 index 0000000..7a25ad2 --- /dev/null +++ b/eive_tmtc/cfdp/user.py @@ -0,0 +1,52 @@ +import logging + +from spacepackets.cfdp import ConditionCode +from tmtccmd.cfdp import CfdpUserBase, TransactionId +from tmtccmd.cfdp.user import ( + TransactionFinishedParams, + MetadataRecvParams, + FileSegmentRecvdParams, +) + +_LOGGER = logging.getLogger(__name__) + + +class EiveCfdpUser(CfdpUserBase): + def transaction_indication(self, transaction_id: TransactionId): + _LOGGER.info(f"CFDP User: Start of File {transaction_id}") + + def eof_sent_indication(self, transaction_id: TransactionId): + _LOGGER.info(f"CFDP User: EOF sent for {transaction_id}") + + def transaction_finished_indication(self, params: TransactionFinishedParams): + _LOGGER.info(f"CFDP User: {params.transaction_id} finished") + + def metadata_recv_indication(self, params: MetadataRecvParams): + pass + + def file_segment_recv_indication(self, params: FileSegmentRecvdParams): + pass + + def report_indication(self, transaction_id: TransactionId, status_report: any): + pass + + def suspended_indication( + self, transaction_id: TransactionId, cond_code: ConditionCode + ): + pass + + def resumed_indication(self, transaction_id: TransactionId, progress: int): + pass + + def fault_indication( + self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int + ): + pass + + def abandoned_indication( + self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int + ): + pass + + def eof_recv_indication(self, transaction_id: TransactionId): + pass diff --git a/eive_tmtc/pus_tc/tc_handler.py b/eive_tmtc/pus_tc/tc_handler.py new file mode 100644 index 0000000..596e5ec --- /dev/null +++ b/eive_tmtc/pus_tc/tc_handler.py @@ -0,0 +1,147 @@ +import logging +from typing import cast + +from eive_tmtc.config.definitions import CFDP_REMOTE_ENTITY_ID, PUS_APID +from eive_tmtc.pus_tc.procedure_packer import handle_default_procedure +from tmtcc import CfdpInCcsdsWrapper +from tmtccmd import TcHandlerBase, ProcedureWrapper +from tmtccmd.cfdp.defs import CfdpRequestType +from tmtccmd.logging import get_current_time_string +from tmtccmd.logging.pus import RawTmtcTimedLogWrapper +from tmtccmd.tc import ( + DefaultPusQueueHelper, + QueueWrapper, + FeedWrapper, + TcProcedureType, + SendCbParams, + TcQueueEntryType, +) +from spacepackets.ecss import PusVerificator +from tmtccmd.util import FileSeqCountProvider +from spacepackets.cfdp import PduHolder, DirectiveType + + +_LOGGER = logging.getLogger(__name__) + + +class TcHandler(TcHandlerBase): + def __init__( + self, + seq_count_provider: FileSeqCountProvider, + cfdp_in_ccsds_wrapper: CfdpInCcsdsWrapper, + pus_verificator: PusVerificator, + high_level_file_logger: logging.Logger, + raw_pus_logger: RawTmtcTimedLogWrapper, + gui: bool, + ): + super().__init__() + self.cfdp_handler_started = False + self.cfdp_dest_id = CFDP_REMOTE_ENTITY_ID + self.seq_count_provider = seq_count_provider + self.pus_verificator = pus_verificator + self.high_level_file_logger = high_level_file_logger + self.pus_raw_logger = raw_pus_logger + self.gui = gui + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=QueueWrapper.empty(), + default_pus_apid=PUS_APID, + seq_cnt_provider=seq_count_provider, + pus_verificator=pus_verificator, + tc_sched_timestamp_len=4, + ) + self.cfdp_in_ccsds_wrapper = cfdp_in_ccsds_wrapper + + def cfdp_done(self) -> bool: + if self.cfdp_handler_started: + if not self.cfdp_in_ccsds_wrapper.handler.put_request_pending(): + self.cfdp_handler_started = False + return True + return False + + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): + self.queue_helper.queue_wrapper = wrapper.queue_wrapper + if info.proc_type == TcProcedureType.DEFAULT: + handle_default_procedure(self, info.to_def_procedure(), self.queue_helper) + elif info.proc_type == TcProcedureType.CFDP: + self.handle_cfdp_procedure(info) + + def send_cb(self, send_params: SendCbParams): + entry_helper = send_params.entry + if entry_helper.is_tc: + if entry_helper.entry_type == TcQueueEntryType.PUS_TC: + pus_tc_wrapper = entry_helper.to_pus_tc_entry() + # pus_tc_wrapper.pus_tc.apid = PUS_APID + raw_tc = pus_tc_wrapper.pus_tc.pack() + self.pus_raw_logger.log_tc(pus_tc_wrapper.pus_tc) + tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" + _LOGGER.info(tc_info_string) + self.high_level_file_logger.info( + f"{get_current_time_string(True)}: {tc_info_string}" + ) + # with open("tc.bin", "wb") as of: + # of.write(raw_tc) + send_params.com_if.send(raw_tc) + elif entry_helper.entry_type == TcQueueEntryType.CCSDS_TC: + cfdp_packet_in_ccsds = entry_helper.to_space_packet_entry() + send_params.com_if.send(cfdp_packet_in_ccsds.space_packet.pack()) + # TODO: Log raw CFDP packets similarly to how PUS packets are logged. + # - Log full raw format including space packet wrapper + # - Log context information: Transaction ID, and PDU type and directive + # Could re-use file logger. Should probably do that + # print(f"sending packet: [{cfdp_packet_in_ccsds.space_packet.pack()}]") + # with open(f"cfdp_packet_{self.cfdp_counter}", "wb") as of: + # of.write(cfdp_packet_in_ccsds.space_packet.pack()) + # self.cfdp_counter += 1 + elif entry_helper.entry_type == TcQueueEntryType.LOG: + log_entry = entry_helper.to_log_entry() + _LOGGER.info(log_entry.log_str) + self.high_level_file_logger.info(log_entry.log_str) + + def handle_cfdp_procedure(self, info: ProcedureWrapper): + cfdp_procedure = info.to_cfdp_procedure() + if cfdp_procedure.cfdp_request_type == CfdpRequestType.PUT: + if ( + not self.cfdp_in_ccsds_wrapper.handler.put_request_pending() + and not self.cfdp_handler_started + ): + put_req = cfdp_procedure.request_wrapper.to_put_request() + put_req.cfg.destination_id = self.cfdp_dest_id + _LOGGER.info( + f"CFDP: Starting file put request with parameters:\n{put_req}" + ) + self.cfdp_in_ccsds_wrapper.handler.cfdp_handler.put_request(put_req) + self.cfdp_handler_started = True + + for source_pair, dest_pair in self.cfdp_in_ccsds_wrapper.handler: + pdu, sp = source_pair + pdu = cast(PduHolder, pdu) + if pdu.is_file_directive: + if pdu.pdu_directive_type == DirectiveType.METADATA_PDU: + metadata = pdu.to_metadata_pdu() + self.queue_helper.add_log_cmd( + f"CFDP Source: Sending Metadata PDU for file with size " + f"{metadata.file_size}" + ) + elif pdu.pdu_directive_type == DirectiveType.EOF_PDU: + self.queue_helper.add_log_cmd( + "CFDP Source: Sending EOF PDU" + ) + else: + fd_pdu = pdu.to_file_data_pdu() + self.queue_helper.add_log_cmd( + f"CFDP Source: Sending File Data PDU for segment at offset " + f"{fd_pdu.offset} with length {len(fd_pdu.file_data)}" + ) + self.queue_helper.add_ccsds_tc(sp) + self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent() + self.cfdp_in_ccsds_wrapper.handler.source_handler.state_machine() + + def queue_finished_cb(self, info: ProcedureWrapper): + if info is not None: + if info.proc_type == TcQueueEntryType.PUS_TC: + def_proc = info.to_def_procedure() + _LOGGER.info( + f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}" + ) + elif info.proc_type == TcProcedureType.CFDP: + _LOGGER.info("Finished CFDP queue") diff --git a/tmtcc.py b/tmtcc.py index d6d3645..ec19fdf 100755 --- a/tmtcc.py +++ b/tmtcc.py @@ -4,21 +4,22 @@ import sys import time import traceback from pathlib import Path -from typing import cast +from eive_tmtc.cfdp.fault_handler import EiveCfdpFaultHandler +from eive_tmtc.cfdp.user import EiveCfdpUser from spacepackets.ccsds import SPACE_PACKET_HEADER_SIZE from spacepackets.cfdp import ( ConditionCode, ChecksumType, TransmissionMode, - PduHolder, DirectiveType, PduFactory, PduType, ) + +from eive_tmtc.pus_tc.tc_handler import TcHandler from tmtccmd.logging import add_colorlog_console_logger from tmtccmd.cfdp import CfdpUserBase, TransactionId -from tmtccmd.cfdp.defs import CfdpRequestType from tmtccmd.cfdp.handler import CfdpInCcsdsHandler from tmtccmd.cfdp.mib import ( DefaultFaultHandlerBase, @@ -31,7 +32,6 @@ from tmtccmd.cfdp.user import ( MetadataRecvParams, FileSegmentRecvdParams, ) -from tmtccmd.tc.handler import SendCbParams try: import spacepackets @@ -54,7 +54,7 @@ except ImportError: sys.exit(1) from spacepackets.ecss import PusVerificator -from tmtccmd import TcHandlerBase, BackendBase +from tmtccmd import BackendBase from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter @@ -66,15 +66,6 @@ from tmtccmd.logging.pus import ( from tmtccmd.pus import VerificationWrapper from tmtccmd.tm import SpecificApidHandlerBase, GenericApidHandlerBase, CcsdsTmHandler from tmtccmd.core import BackendRequest -from tmtccmd.logging import get_current_time_string -from tmtccmd.tc import ( - ProcedureWrapper, - FeedWrapper, - TcProcedureType, - TcQueueEntryType, - DefaultPusQueueHelper, - QueueWrapper, -) from tmtccmd.config import ( default_json_path, SetupWrapper, @@ -95,7 +86,6 @@ from eive_tmtc.config.definitions import ( ) from eive_tmtc.config.hook import EiveHookObject from eive_tmtc.pus_tm.pus_demux import pus_factory_hook -from eive_tmtc.pus_tc.procedure_packer import handle_default_procedure _LOGGER = APP_LOGGER _LOG_LEVEL = logging.INFO @@ -105,61 +95,6 @@ ROTATING_TIMED_LOGGER_INTERVAL_WHEN = TimedLogWhen.PER_MINUTE ROTATING_TIMED_LOGGER_INTERVAL = 30 -class EiveCfdpFaultHandler(DefaultFaultHandlerBase): - def notice_of_suspension_cb(self, cond: ConditionCode): - pass - - def notice_of_cancellation_cb(self, cond: ConditionCode): - pass - - def abandoned_cb(self, cond: ConditionCode): - pass - - def ignore_cb(self, cond: ConditionCode): - pass - - -class EiveCfdpUser(CfdpUserBase): - def transaction_indication(self, transaction_id: TransactionId): - _LOGGER.info(f"CFDP User: Start of File {transaction_id}") - - def eof_sent_indication(self, transaction_id: TransactionId): - _LOGGER.info(f"CFDP User: EOF sent for {transaction_id}") - - def transaction_finished_indication(self, params: TransactionFinishedParams): - _LOGGER.info(f"CFDP User: {params.transaction_id} finished") - - def metadata_recv_indication(self, params: MetadataRecvParams): - pass - - def file_segment_recv_indication(self, params: FileSegmentRecvdParams): - pass - - def report_indication(self, transaction_id: TransactionId, status_report: any): - pass - - def suspended_indication( - self, transaction_id: TransactionId, cond_code: ConditionCode - ): - pass - - def resumed_indication(self, transaction_id: TransactionId, progress: int): - pass - - def fault_indication( - self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int - ): - pass - - def abandoned_indication( - self, transaction_id: TransactionId, cond_code: ConditionCode, progress: int - ): - pass - - def eof_recv_indication(self, transaction_id: TransactionId): - pass - - class PusHandler(SpecificApidHandlerBase): def __init__( self, @@ -216,129 +151,6 @@ class CfdpInCcsdsWrapper(SpecificApidHandlerBase): self.handler.pass_pdu_packet(pdu_base) -class TcHandler(TcHandlerBase): - def __init__( - self, - seq_count_provider: FileSeqCountProvider, - cfdp_in_ccsds_wrapper: CfdpInCcsdsWrapper, - pus_verificator: PusVerificator, - high_level_file_logger: logging.Logger, - raw_pus_logger: RawTmtcTimedLogWrapper, - gui: bool, - ): - super().__init__() - self.cfdp_handler_started = False - self.cfdp_dest_id = CFDP_REMOTE_ENTITY_ID - self.seq_count_provider = seq_count_provider - self.pus_verificator = pus_verificator - self.high_level_file_logger = high_level_file_logger - self.pus_raw_logger = raw_pus_logger - self.gui = gui - self.queue_helper = DefaultPusQueueHelper( - queue_wrapper=QueueWrapper.empty(), - default_pus_apid=PUS_APID, - seq_cnt_provider=seq_count_provider, - pus_verificator=pus_verificator, - tc_sched_timestamp_len=4, - ) - self.cfdp_in_ccsds_wrapper = cfdp_in_ccsds_wrapper - - def cfdp_done(self) -> bool: - if self.cfdp_handler_started: - if not self.cfdp_in_ccsds_wrapper.handler.put_request_pending(): - self.cfdp_handler_started = False - return True - return False - - def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): - self.queue_helper.queue_wrapper = wrapper.queue_wrapper - if info.proc_type == TcProcedureType.DEFAULT: - handle_default_procedure(self, info.to_def_procedure(), self.queue_helper) - elif info.proc_type == TcProcedureType.CFDP: - self.handle_cfdp_procedure(info) - - def send_cb(self, send_params: SendCbParams): - entry_helper = send_params.entry - if entry_helper.is_tc: - if entry_helper.entry_type == TcQueueEntryType.PUS_TC: - pus_tc_wrapper = entry_helper.to_pus_tc_entry() - # pus_tc_wrapper.pus_tc.apid = PUS_APID - raw_tc = pus_tc_wrapper.pus_tc.pack() - self.pus_raw_logger.log_tc(pus_tc_wrapper.pus_tc) - tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}" - _LOGGER.info(tc_info_string) - self.high_level_file_logger.info( - f"{get_current_time_string(True)}: {tc_info_string}" - ) - # with open("tc.bin", "wb") as of: - # of.write(raw_tc) - send_params.com_if.send(raw_tc) - elif entry_helper.entry_type == TcQueueEntryType.CCSDS_TC: - cfdp_packet_in_ccsds = entry_helper.to_space_packet_entry() - send_params.com_if.send(cfdp_packet_in_ccsds.space_packet.pack()) - # TODO: Log raw CFDP packets similarly to how PUS packets are logged. - # - Log full raw format including space packet wrapper - # - Log context information: Transaction ID, and PDU type and directive - # Could re-use file logger. Should probably do that - # print(f"sending packet: [{cfdp_packet_in_ccsds.space_packet.pack()}]") - # with open(f"cfdp_packet_{self.cfdp_counter}", "wb") as of: - # of.write(cfdp_packet_in_ccsds.space_packet.pack()) - # self.cfdp_counter += 1 - elif entry_helper.entry_type == TcQueueEntryType.LOG: - log_entry = entry_helper.to_log_entry() - _LOGGER.info(log_entry.log_str) - self.high_level_file_logger.info(log_entry.log_str) - - def handle_cfdp_procedure(self, info: ProcedureWrapper): - cfdp_procedure = info.to_cfdp_procedure() - if cfdp_procedure.cfdp_request_type == CfdpRequestType.PUT: - if ( - not self.cfdp_in_ccsds_wrapper.handler.put_request_pending() - and not self.cfdp_handler_started - ): - put_req = cfdp_procedure.request_wrapper.to_put_request() - put_req.cfg.destination_id = self.cfdp_dest_id - _LOGGER.info( - f"CFDP: Starting file put request with parameters:\n{put_req}" - ) - self.cfdp_in_ccsds_wrapper.handler.cfdp_handler.put_request(put_req) - self.cfdp_handler_started = True - - for source_pair, dest_pair in self.cfdp_in_ccsds_wrapper.handler: - pdu, sp = source_pair - pdu = cast(PduHolder, pdu) - if pdu.is_file_directive: - if pdu.pdu_directive_type == DirectiveType.METADATA_PDU: - metadata = pdu.to_metadata_pdu() - self.queue_helper.add_log_cmd( - f"CFDP Source: Sending Metadata PDU for file with size " - f"{metadata.file_size}" - ) - elif pdu.pdu_directive_type == DirectiveType.EOF_PDU: - self.queue_helper.add_log_cmd( - "CFDP Source: Sending EOF PDU" - ) - else: - fd_pdu = pdu.to_file_data_pdu() - self.queue_helper.add_log_cmd( - f"CFDP Source: Sending File Data PDU for segment at offset " - f"{fd_pdu.offset} with length {len(fd_pdu.file_data)}" - ) - self.queue_helper.add_ccsds_tc(sp) - self.cfdp_in_ccsds_wrapper.handler.confirm_source_packet_sent() - self.cfdp_in_ccsds_wrapper.handler.source_handler.state_machine() - - def queue_finished_cb(self, info: ProcedureWrapper): - if info is not None: - if info.proc_type == TcQueueEntryType.PUS_TC: - def_proc = info.to_def_procedure() - _LOGGER.info( - f"Finished queue for service {def_proc.service} and op code {def_proc.op_code}" - ) - elif info.proc_type == TcProcedureType.CFDP: - _LOGGER.info("Finished CFDP queue") - - def setup_params() -> (SetupWrapper, int): hook_obj = EiveHookObject(default_json_path()) params = SetupParams()