diff --git a/eive_tmtc/tmtc/acs/star_tracker.py b/eive_tmtc/tmtc/acs/star_tracker.py index 28cd4d0..3d52b9f 100644 --- a/eive_tmtc/tmtc/acs/star_tracker.py +++ b/eive_tmtc/tmtc/acs/star_tracker.py @@ -129,11 +129,12 @@ class SetId(enum.IntEnum): LIMITS = 68 CENTROIDING = 72 LISA = 73 - MATCHED_CENTROIDS = 89 - BLOB = 90 - BLOBS = 91 - CENTROID = 92 - CENTROIDS = 93 + AUTO_BLOB = 89 + MATCHED_CENTROIDS = 90 + BLOB = 91 + BLOBS = 92 + CENTROID = 93 + CENTROIDS = 94 class FileDefs: @@ -808,17 +809,18 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper): def handle_blob_set(hk_data: bytes, pw: PrintWrapper): - pw.dlog("Received Blob set") + pw.dlog("Received Blob Set") if len(hk_data) < 14: _LOGGER.warning(f"Blob dataset HK data with length {len(hk_data)} too short") return current_idx = unpack_time_hk(hk_data, 0, pw) blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4]) pw.dlog(f"Blob count: {blob_count}") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx + 4 :], num_vars=3) def handle_blobs_set(hk_data: bytes, pw: PrintWrapper): - pw.dlog("Received Blobs set") + pw.dlog("Received Blobs Set") if len(hk_data) < 6 + 2 * 2 * 8: _LOGGER.warning(f"Blobs dataset HK data with length {len(hk_data)} too short") return @@ -832,7 +834,16 @@ def handle_blobs_set(hk_data: bytes, pw: PrintWrapper): pw.dlog( f"Count {count} | Count Used {count_used} | Number of skipped 4lines {nr_4lines_skipped}" ) - # TODO: Parse X, Y times 8 + fmt_coords = "!HHHHHHHH" + inc_len = struct.calcsize(fmt_coords) + x_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + pw.dlog("{:<8} {:<8}".format("X", "Y")) + for idx in range(8): + pw.dlog("{:<8} {:<8}".format(x_coords[idx], y_coords[idx])) + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=7) def handle_centroid_set(hk_data: bytes, pw: PrintWrapper): @@ -843,24 +854,75 @@ def handle_centroid_set(hk_data: bytes, pw: PrintWrapper): ) current_idx = unpack_time_hk(hk_data, 0, pw) centroid_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4]) - pw.dlog(f"Blob count: {centroid_count}") + pw.dlog(f"Centroid count: {centroid_count}") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) def handle_centroids_set(hk_data: bytes, pw: PrintWrapper): - pw.dlog("Received Centroids set") + pw.dlog("Received Centroids Set") current_idx = unpack_time_hk(hk_data, 0, pw) centroids_count = struct.unpack("!H", hk_data[current_idx : current_idx + 2]) pw.dlog(f"Centroids count: {centroids_count}") - # TODO: Parse X, Y, M times 16 + fmt_coords = "!HHHHHHHHHHHHHHHH" + inc_len = struct.calcsize(fmt_coords) + x_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + magnitude = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + pw.dlog("{:<8} {:<8} {:<8}".format("X", "Y", "Magnitude")) + for idx in range(16): + pw.dlog( + "{:<8} {:<8} {:<8}".format(x_coords[idx], y_coords[idx], magnitude[idx]) + ) + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6) def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper): - pw.dlog("Received Matched Centroids set") + pw.dlog("Received Matched Centroids Set") current_idx = unpack_time_hk(hk_data, 0, pw) num_matched_centroids = hk_data[current_idx] current_idx += 1 pw.dlog(f"Number of matched centroids {num_matched_centroids}") - # TODO: Parse ID, X, Y, ERROR X, ERROR Y times 16 + fmt_ids = "!IIIIIIIIIIIIIIII" + inc_len = struct.calcsize(fmt_ids) + star_id = struct.unpack(fmt_ids, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + fmt_floats = "!ffffffffffffffff" + inc_len = struct.calcsize(fmt_floats) + x_coords = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_coords = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + x_errors = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_errors = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + pw.dlog( + "{:<8} {:<8} {:<8} {:<8} {:<8}".format( + "Star ID", "X", "Y", "X Error", "Y Error" + ) + ) + for idx in range(16): + pw.dlog( + "{:<8} {:<8} {:<8} {:<8} {:<8}".format( + star_id[idx], x_coords[idx], y_coords[idx], x_errors[idx], y_errors[idx] + ) + ) + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8) + + +def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Auto Blob Set") + current_idx = unpack_time_hk(hk_data, 0, pw) + fmt_threshold = "!f" + inc_len = struct.calcsize(fmt_threshold) + threshold = struct.unpack( + fmt_threshold, hk_data[current_idx : current_idx + inc_len] + )[0] + pw.dlog(f"Threshold {threshold}") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) @tmtc_definitions_provider