finally added rudimentary HK filter

This commit is contained in:
Robin Müller 2023-10-26 19:49:45 +02:00
parent b3d903115e
commit c4598ff058
Signed by: muellerr
GPG Key ID: A649FB78196E3849
3 changed files with 46 additions and 16 deletions

View File

@ -1,5 +1,7 @@
"""HK Handling for EIVE OBSW""" """HK Handling for EIVE OBSW"""
import dataclasses
import logging import logging
from typing import List
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data
from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data
@ -46,8 +48,18 @@ _LOGGER = logging.getLogger(__name__)
FORWARD_SENSOR_TEMPS = False FORWARD_SENSOR_TEMPS = False
@dataclasses.dataclass
class HkFilter:
object_ids: List[ObjectIdU32]
set_ids: List[int]
def handle_hk_packet( def handle_hk_packet(
raw_tm: bytes, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, hk_level: int raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
hk_filter: HkFilter,
hk_level: int,
): ):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
@ -56,17 +68,21 @@ def handle_hk_packet(
if tm_packet.subservice == 25 or tm_packet.subservice == 26: if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:] hk_data = tm_packet.tm_data[8:]
printer.generic_hk_tm_print( if named_obj_id in hk_filter.object_ids:
content_type=HkContentType.HK, handle_regular_hk_print(
object_id=named_obj_id, printer=printer,
set_id=tm_packet.set_id, object_id=named_obj_id,
hk_data=hk_data, hk_packet=tm_packet,
) tm=tm_packet.pus_tm,
hk_data=hk_data,
)
return
try: try:
if hk_level == 1: if hk_level >= 1:
pass printer.generic_hk_tm_print(
elif hk_level > 1: HkContentType.HK, named_obj_id, tm_packet.set_id, hk_data
)
if hk_level >= 1:
handle_regular_hk_print( handle_regular_hk_print(
printer=printer, printer=printer,
object_id=named_obj_id, object_id=named_obj_id,

View File

@ -2,7 +2,7 @@
""" """
import logging import logging
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids, STAR_TRACKER_ID
from spacepackets.ecss import PusTelemetry from spacepackets.ecss import PusTelemetry
from spacepackets.ecss.pus_17_test import Service17Tm from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.util import PrintFormats from spacepackets.util import PrintFormats
@ -14,11 +14,12 @@ from tmtccmd.tm.pus_20_fsfw_param import Service20ParamDumpWrapper
from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice
from tmtccmd.tm.pus_200_fsfw_mode import Subservice as ModeSubservice from tmtccmd.tm.pus_200_fsfw_mode import Subservice as ModeSubservice
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.util import ObjectIdU32
from .defs import PrintWrapper from .defs import PrintWrapper
from .event_handler import handle_event_packet from .event_handler import handle_event_packet
from .verification_handler import handle_service_1_fsfw_packet, generic_retval_printout from .verification_handler import handle_service_1_fsfw_packet, generic_retval_printout
from .hk_handler import handle_hk_packet from .hk_handler import handle_hk_packet, HkFilter
from .action_reply_handler import handle_action_reply from .action_reply_handler import handle_action_reply
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -30,6 +31,7 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
hk_level: int, hk_level: int,
hk_filter: HkFilter,
): ):
if len(packet) < 8: if len(packet) < 8:
_LOGGER.warning("Detected packet shorter than 8 bytes!") _LOGGER.warning("Detected packet shorter than 8 bytes!")
@ -50,7 +52,11 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet) handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
elif service == 3: elif service == 3:
handle_hk_packet( handle_hk_packet(
printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level printer=printer,
raw_tm=packet,
obj_id_dict=obj_id_dict,
hk_level=hk_level,
hk_filter=hk_filter,
) )
elif service == 5: elif service == 5:
handle_event_packet(raw_tm=packet, pw=pw) handle_event_packet(raw_tm=packet, pw=pw)

View File

@ -15,7 +15,9 @@ from spacepackets.cfdp import (
TransmissionMode, TransmissionMode,
) )
from eive_tmtc.config.object_ids import STAR_TRACKER_ID
from eive_tmtc.pus_tc.tc_handler import TcHandler from eive_tmtc.pus_tc.tc_handler import TcHandler
from eive_tmtc.pus_tm.hk_handler import HkFilter
from tmtccmd.logging import add_colorlog_console_logger from tmtccmd.logging import add_colorlog_console_logger
from tmtccmd.cfdp.handler import CfdpInCcsdsHandler from tmtccmd.cfdp.handler import CfdpInCcsdsHandler
from tmtccmd.cfdp.mib import ( from tmtccmd.cfdp.mib import (
@ -24,7 +26,7 @@ from tmtccmd.cfdp.mib import (
RemoteEntityCfg, RemoteEntityCfg,
) )
from tmtccmd import BackendBase from tmtccmd import BackendBase
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider, ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.logging.pus import ( from tmtccmd.logging.pus import (
@ -77,12 +79,18 @@ class PusHandler(SpecificApidHandlerBase):
self.verif_wrapper = wrapper self.verif_wrapper = wrapper
self.raw_logger = raw_logger self.raw_logger = raw_logger
self.hk_level = hk_level self.hk_level = hk_level
self.hk_filter = HkFilter(object_ids=[ObjectIdU32(STAR_TRACKER_ID)], set_ids=[])
def handle_tm(self, packet: bytes, _user_args: any): def handle_tm(self, packet: bytes, _user_args: any):
# with open("tc.bin", "wb") as of: # with open("tc.bin", "wb") as of:
# of.write(packet) # of.write(packet)
pus_factory_hook( pus_factory_hook(
packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level packet,
self.verif_wrapper,
self.printer,
self.raw_logger,
self.hk_level,
self.hk_filter,
) )