diff --git a/CHANGELOG.md b/CHANGELOG.md index 98d8aa2..8d9e7c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ list yields a list of all related PRs for each release. ## Added - Added commands to unlock and use STR secondary firmware slot. +- STR BlobStats TM handling ## Fixed diff --git a/eive_tmtc/pus_tm/hk_handler.py b/eive_tmtc/pus_tm/hk_handler.py index aeb2433..56c53af 100644 --- a/eive_tmtc/pus_tm/hk_handler.py +++ b/eive_tmtc/pus_tm/hk_handler.py @@ -53,7 +53,7 @@ FORWARD_SENSOR_TEMPS = False @dataclasses.dataclass class HkFilter: - object_ids: List[ObjectIdU32] + object_ids: List[bytes] set_ids: List[int] @@ -72,8 +72,7 @@ def handle_hk_packet( named_obj_id = tm_packet.object_id if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] - - if named_obj_id in hk_filter.object_ids: + if named_obj_id.as_bytes in hk_filter.object_ids: # print(f"PUS TM Base64: {base64.b64encode(raw_tm)}") handle_regular_hk_print( printer=printer, diff --git a/eive_tmtc/tmtc/acs/star_tracker.py b/eive_tmtc/tmtc/acs/star_tracker.py index 85fd63f..a92230f 100644 --- a/eive_tmtc/tmtc/acs/star_tracker.py +++ b/eive_tmtc/tmtc/acs/star_tracker.py @@ -9,6 +9,7 @@ import datetime import enum import logging import struct +from typing import List, Tuple from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.utility.input_helper import InputHelper @@ -183,6 +184,7 @@ class SetId(enum.IntEnum): BLOBS = 92 CENTROID = 93 CENTROIDS = 94 + BLOB_STATS = 102 class DataSetRequest(enum.IntEnum): @@ -876,6 +878,8 @@ def handle_str_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper): handle_centroids_set(hk_data, pw) elif set_id == SetId.CONTRAST: handle_contrast_set(hk_data, pw) + elif set_id == SetId.BLOB_STATS: + handle_blob_stats_set(hk_data, pw) else: _LOGGER.warning(f"HK parsing for Star Tracker set ID {set_id} unimplemented") @@ -1178,6 +1182,35 @@ def handle_contrast_set(hk_data: bytes, pw: PrintWrapper): handle_histo_or_contrast_set("Contrast", hk_data, pw) +def handle_blob_stats_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Blob Stats Set") + if len(hk_data) < 65: + raise ValueError( + f"Matched BlobStats set with length {len(hk_data)} too short. Expected 65 bytes." + ) + current_idx = unpack_time_hk(hk_data, 0, pw) + + def fill_list(current_idx: int) -> Tuple[List[int], int]: + list_to_fill = [] + for _ in range(16): + list_to_fill.append(hk_data[current_idx]) + current_idx += 1 + return list_to_fill, current_idx + + noise_list, current_idx = fill_list(current_idx) + threshold_list, current_idx = fill_list(current_idx) + lvalid_list, current_idx = fill_list(current_idx) + oflow_list, current_idx = fill_list(current_idx) + pw.dlog("Index | Noise | Threshold | LValid | Oflow") + for i in range(16): + pw.dlog( + "{:<3} {:<3} {:<3} {:<3} {:<3}".format( + i, noise_list[i], threshold_list[i], lvalid_list[i], oflow_list[i] + ) + ) + pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=4)) + + def handle_star_tracker_action_replies( action_id: int, pw: PrintWrapper, custom_data: bytes ):