STR extensions
EIVE/-/pipeline/head This commit looks good Details

This commit is contained in:
Robin Müller 2023-10-09 16:31:04 +02:00
parent 783bdd297a
commit ead9c20888
Signed by: muellerr
GPG Key ID: FCE0B2BD2195142F
1 changed files with 71 additions and 0 deletions

View File

@ -129,6 +129,11 @@ class SetId(enum.IntEnum):
LIMITS = 68
CENTROIDING = 72
LISA = 73
MATCHED_CENTROIDS = 89
BLOB = 90
BLOBS = 91
CENTROID = 92
CENTROIDS = 93
class FileDefs:
@ -682,6 +687,16 @@ def handle_str_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
handle_solution_set(hk_data, pw)
elif set_id == SetId.TEMPERATURE:
handle_temperature_set(hk_data, pw)
elif set_id == SetId.MATCHED_CENTROIDS:
handle_matched_centroids_set(hk_data, pw)
elif set_id == SetId.BLOB:
handle_blob_set(hk_data, pw)
elif set_id == SetId.BLOBS:
handle_blobs_set(hk_data, pw)
elif set_id == SetId.CENTROID:
handle_centroid_set(hk_data, pw)
elif set_id == SetId.CENTROIDS:
handle_centroids_set(hk_data, pw)
else:
_LOGGER.warning(f"HK parsing for Star Tracker set ID {set_id} unimplemented")
@ -792,6 +807,62 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23)
def handle_blob_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog("Received Blob set")
if len(hk_data) < 14:
_LOGGER.warning(f"Blob dataset HK data with length {len(hk_data)} too short")
return
current_idx = unpack_time_hk(hk_data, 0, pw)
blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])
pw.dlog(f"Blob count: {blob_count}")
def handle_blobs_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog("Received Blobs set")
if len(hk_data) < 6 + 2 * 2 * 8:
_LOGGER.warning(f"Blobs dataset HK data with length {len(hk_data)} too short")
return
current_idx = unpack_time_hk(hk_data, 0, pw)
fmt_str = "!HHH"
inc_len = struct.calcsize(fmt_str)
count, count_used, nr_4lines_skipped = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
current_idx += inc_len
pw.dlog(
f"Count {count} | Count Used {count_used} | Number of skipped 4lines {nr_4lines_skipped}"
)
# TODO: Parse X, Y times 8
def handle_centroid_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog("Received Centroid set")
if len(hk_data) < 14:
raise ValueError(
f"Centroid dataset HK data with length {len(hk_data)} too short"
)
current_idx = unpack_time_hk(hk_data, 0, pw)
centroid_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])
pw.dlog(f"Blob count: {centroid_count}")
def handle_centroids_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog("Received Centroids set")
current_idx = unpack_time_hk(hk_data, 0, pw)
centroids_count = struct.unpack("!H", hk_data[current_idx : current_idx + 2])
pw.dlog(f"Centroids count: {centroids_count}")
# TODO: Parse X, Y, M times 16
def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog("Received Matched Centroids set")
current_idx = unpack_time_hk(hk_data, 0, pw)
num_matched_centroids = hk_data[current_idx]
current_idx += 1
# TODO: Parse ID, X, Y, ERROR X, ERROR Y times 16
pass
@tmtc_definitions_provider
def add_str_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()