82 lines
2.8 KiB
Python
82 lines
2.8 KiB
Python
"""This file transfers control of housekeeping handling (PUS service 3) to the developer
|
|
"""
|
|
import struct
|
|
from typing import Tuple
|
|
|
|
from tmtccmd.tm import Service3FsfwTm
|
|
from tmtccmd.tm.pus_3_hk_base import HkContentType, Service3Base
|
|
from tmtccmd.logging import get_console_logger
|
|
|
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
|
|
from tmtccmd.util import ObjectIdDictT, ObjectIdU32
|
|
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
|
|
|
LOGGER = get_console_logger()
|
|
|
|
|
|
def handle_hk_packet(
|
|
raw_tm: bytes,
|
|
obj_id_dict: ObjectIdDictT,
|
|
printer: FsfwTmTcPrinter,
|
|
):
|
|
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
|
|
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
|
|
if named_obj_id is None:
|
|
named_obj_id = tm_packet.object_id
|
|
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
|
hk_data = tm_packet.tm_data[8:]
|
|
printer.generic_hk_tm_print(
|
|
content_type=HkContentType.HK,
|
|
object_id=named_obj_id,
|
|
set_id=tm_packet.set_id,
|
|
hk_data=hk_data,
|
|
)
|
|
handle_regular_hk_print(
|
|
printer=printer,
|
|
object_id=named_obj_id,
|
|
hk_packet=tm_packet,
|
|
hk_data=hk_data,
|
|
)
|
|
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
|
|
LOGGER.warning("HK definitions printout not implemented yet")
|
|
|
|
|
|
def handle_regular_hk_print(
|
|
printer: FsfwTmTcPrinter,
|
|
object_id: ObjectIdU32,
|
|
hk_packet: Service3Base,
|
|
hk_data: bytes,
|
|
):
|
|
if object_id == TEST_DEVICE_0_ID or object_id == TEST_DEVICE_1_ID:
|
|
handle_test_set_deserialization(hk_data=hk_data)
|
|
|
|
|
|
def handle_test_set_deserialization(
|
|
hk_data: bytes,
|
|
) -> Tuple[list, list, bytearray, int]:
|
|
header_list = []
|
|
content_list = []
|
|
validity_buffer = bytearray()
|
|
# uint8 (1) + uint32_t (4) + float vector with 3 entries (12) + validity buffer (1)
|
|
if len(hk_data) < 18:
|
|
LOGGER.warning("Invalid HK data format for test set reply!")
|
|
return header_list, content_list, validity_buffer, 0
|
|
uint8_value = struct.unpack("!B", hk_data[0:1])[0]
|
|
uint32_value = struct.unpack("!I", hk_data[1:5])[0]
|
|
float_value_1 = struct.unpack("!f", hk_data[5:9])[0]
|
|
float_value_2 = struct.unpack("!f", hk_data[9:13])[0]
|
|
float_value_3 = struct.unpack("!f", hk_data[13:17])[0]
|
|
validity_buffer.append(hk_data[17])
|
|
header_list.append("uint8 value")
|
|
header_list.append("uint32 value")
|
|
header_list.append("float vec value 1")
|
|
header_list.append("float vec value 2")
|
|
header_list.append("float vec value 3")
|
|
|
|
content_list.append(uint8_value)
|
|
content_list.append(uint32_value)
|
|
content_list.append(float_value_1)
|
|
content_list.append(float_value_2)
|
|
content_list.append(float_value_3)
|
|
return header_list, content_list, validity_buffer, 3
|