fsfw-example-tmtc-common/pus_tm/hk_handling.py

82 lines
2.8 KiB
Python

"""This file transfers control of housekeeping handling (PUS service 3) to the developer
"""
import struct
from typing import Tuple
from tmtccmd.tm import Service3FsfwTm
from tmtccmd.tm.pus_3_hk_base import HkContentType, Service3Base
from tmtccmd.logging import get_console_logger
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
from tmtccmd.util import ObjectIdDictT, ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
def handle_hk_packet(
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None:
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data,
)
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,
hk_packet=tm_packet,
hk_data=hk_data,
)
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectIdU32,
hk_packet: Service3Base,
hk_data: bytes,
):
if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID:
handle_test_set_deserialization(hk_data=hk_data)
def handle_test_set_deserialization(
hk_data: bytes,
) -> Tuple[list, list, bytearray, int]:
header_list = []
content_list = []
validity_buffer = bytearray()
# uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1)
if len(hk_data) < 18:
LOGGER.warning("Invalid HK data format for test set reply!")
return header_list, content_list, validity_buffer, 0
uint8_value = struct.unpack("!B", hk_data[0:1])[0]
uint32_value = struct.unpack("!I", hk_data[1:5])[0]
float_value_1 = struct.unpack("!f", hk_data[5:9])[0]
float_value_2 = struct.unpack("!f", hk_data[9:13])[0]
float_value_3 = struct.unpack("!f", hk_data[13:17])[0]
validity_buffer.append(hk_data[17])
header_list.append("uint8 value")
header_list.append("uint32 value")
header_list.append("float vec value 1")
header_list.append("float vec value 2")
header_list.append("float vec value 3")
content_list.append(uint8_value)
content_list.append(uint32_value)
content_list.append(float_value_1)
content_list.append(float_value_2)
content_list.append(float_value_3)
return header_list, content_list, validity_buffer, 3