From 4386b180495b094097177703e31609638d7367a9 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 8 Mar 2023 19:36:57 +0100 Subject: [PATCH 1/2] add dipole set handling --- eive_tmtc/tmtc/acs/imtq.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/eive_tmtc/tmtc/acs/imtq.py b/eive_tmtc/tmtc/acs/imtq.py index be2d2d3..b5c9b26 100644 --- a/eive_tmtc/tmtc/acs/imtq.py +++ b/eive_tmtc/tmtc/acs/imtq.py @@ -439,6 +439,8 @@ def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int): return handle_raw_mtm_measurement(printer, hk_data, False) elif set_id == ImtqSetId.RAW_MTM_WITH_TORQUE: return handle_raw_mtm_measurement(printer, hk_data, True) + elif set_id == ImtqSetId.DIPOLES: + return handle_dipole_set(printer, hk_data) elif set_id == ImtqSetId.STATUS_SET: return handle_status_set(printer, hk_data) else: @@ -455,6 +457,24 @@ def unpack_status_set(hk_data: bytes) -> List: return [status_mode, status_error, status_conf, status_uptime] +def handle_dipole_set(printer: FsfwTmTcPrinter, hk_data: bytes): + pw = PrintWrapper(printer) + pw.dlog("Received iMTQ dipole set") + fmt_str = "!hhhH" + fmt_len = struct.calcsize(fmt_str) + ( + dipole_x, + dipole_y, + dipole_z, + current_torque_duration + ) = struct.unpack(fmt_str, hk_data) + pw.dlog(f"Dipole X: {dipole_x}") + pw.dlog(f"Dipole Y: {dipole_y}") + pw.dlog(f"Dipole Z: {dipole_z}") + pw.dlog(f"Current torque duration: {current_torque_duration}") + pw.printer.print_validity_buffer(hk_data[fmt_len:], 2) + + def unpack_eng_hk(hk_data: bytes) -> List: digital_voltage = struct.unpack("!H", hk_data[0:2])[0] analog_voltage = struct.unpack("!H", hk_data[2:4])[0] From ed5cb87cada2d5d52bd6b319e0fc8cd1b63d8b55 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 8 Mar 2023 19:57:28 +0100 Subject: [PATCH 2/2] thats a large set --- eive_tmtc/pus_tm/hk_handling.py | 2 + eive_tmtc/tmtc/acs/imtq.py | 9 ++-- eive_tmtc/tmtc/acs/star_tracker.py | 78 +++++++++++++++++++++++++++++- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/eive_tmtc/pus_tm/hk_handling.py b/eive_tmtc/pus_tm/hk_handling.py index 762a447..fc6b621 100644 --- a/eive_tmtc/pus_tm/hk_handling.py +++ b/eive_tmtc/pus_tm/hk_handling.py @@ -166,6 +166,8 @@ def handle_regular_hk_print( return handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data ) + elif objb == obj_ids.STAR_TRACKER_ID: + return handle_str_hk_data(set_id=set_id, hk_data=hk_data, printer=printer) elif objb == obj_ids.PLOC_SUPV_ID: return handle_supv_hk_data(set_id=set_id, hk_data=hk_data, printer=printer) elif objb == obj_ids.ACS_CONTROLLER: diff --git a/eive_tmtc/tmtc/acs/imtq.py b/eive_tmtc/tmtc/acs/imtq.py index b5c9b26..6c13a03 100644 --- a/eive_tmtc/tmtc/acs/imtq.py +++ b/eive_tmtc/tmtc/acs/imtq.py @@ -462,12 +462,9 @@ def handle_dipole_set(printer: FsfwTmTcPrinter, hk_data: bytes): pw.dlog("Received iMTQ dipole set") fmt_str = "!hhhH" fmt_len = struct.calcsize(fmt_str) - ( - dipole_x, - dipole_y, - dipole_z, - current_torque_duration - ) = struct.unpack(fmt_str, hk_data) + (dipole_x, dipole_y, dipole_z, current_torque_duration) = struct.unpack( + fmt_str, hk_data + ) pw.dlog(f"Dipole X: {dipole_x}") pw.dlog(f"Dipole Y: {dipole_y}") pw.dlog(f"Dipole Z: {dipole_z}") diff --git a/eive_tmtc/tmtc/acs/star_tracker.py b/eive_tmtc/tmtc/acs/star_tracker.py index 70c00fb..cafa2e2 100644 --- a/eive_tmtc/tmtc/acs/star_tracker.py +++ b/eive_tmtc/tmtc/acs/star_tracker.py @@ -5,11 +5,13 @@ @author J. Meier @date 14.08.2021 """ +import datetime import enum import logging import struct from eive_tmtc.config.definitions import CustomServiceList +from eive_tmtc.pus_tm.defs import PrintWrapper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config.tmtc import tmtc_definitions_provider @@ -18,7 +20,7 @@ from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.util import ObjectIdU32 from eive_tmtc.utility.input_helper import InputHelper - +from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter _LOGGER = logging.getLogger(__name__) @@ -677,6 +679,80 @@ def get_upload_image() -> str: return image +def handle_str_hk_data(set_id: int, hk_data: bytes, printer: FsfwTmTcPrinter): + pw = PrintWrapper(printer) + pw.dlog(f"Received STR HK set with set ID {set_id}") + if set_id == SetId.SOLUTION: + handle_solution_set(hk_data, pw) + else: + _LOGGER.warning(f"HK parsing for Star Tracker set ID {set_id} unimplemented") + + +def handle_solution_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received solution set") + ticks_time_fmt = "!IQ" + current_idx = 0 + fmt_len = struct.calcsize(ticks_time_fmt) + (ticks, unix_time) = struct.unpack( + ticks_time_fmt, hk_data[current_idx : current_idx + fmt_len] + ) + unix_as_dt = datetime.datetime.fromtimestamp(unix_time, tz=datetime.timezone.utc) + pw.dlog(f"Ticks: {ticks} | UNIX time: {unix_time}") + pw.dlog(f"UNIX as datetime: {unix_as_dt}") + current_idx += fmt_len + calib_quaternions_fmt = "!ffff" + fmt_len = struct.calcsize(calib_quaternions_fmt) + (calib_q_w, calib_q_x, calib_q_y, calib_q_z) = struct.unpack( + calib_quaternions_fmt, hk_data[current_idx : current_idx + fmt_len] + ) + pw.dlog("Calibrated Quaternions") + pw.dlog(f"Quaternion w: {calib_q_w}") + pw.dlog(f"Quaternion x: {calib_q_x}") + pw.dlog(f"Quaternion y: {calib_q_y}") + pw.dlog(f"Quaternion z: {calib_q_z}") + current_idx += fmt_len + track_fmt = "!fffff" + fmt_len = struct.calcsize(track_fmt) + (track_confidence, track_q_w, track_q_x, track_q_y, track_q_z) = struct.unpack( + track_fmt, hk_data[current_idx : current_idx + fmt_len] + ) + pw.dlog(f"Track Confidence: {track_confidence}") + pw.dlog(f"Track QW: {track_q_w}") + pw.dlog(f"Track QX: {track_q_x}") + pw.dlog(f"Track QY: {track_q_y}") + pw.dlog(f"Track QZ: {track_q_z}") + current_idx += fmt_len + track_removed = hk_data[current_idx] + pw.dlog(f"Number of stars removed from tracking solution: {track_removed}") + current_idx += 1 + stars_centroided = hk_data[current_idx] + pw.dlog(f"Centroided stars: {stars_centroided}") + current_idx += 1 + stars_matched_database = hk_data[current_idx] + pw.dlog(f"Stars matched: {stars_matched_database}") + current_idx += 1 + # Result of LISA: Lost in space algorithm + lisa_fmt = "!fffffB" + fmt_len = struct.calcsize(track_fmt) + (lisa_q_w, lisa_q_x, lisa_q_y, lisa_q_z) = struct.unpack( + lisa_fmt, hk_data[current_idx : current_idx + fmt_len] + ) + pw.dlog(f"LISA QW: {lisa_q_w}") + pw.dlog(f"LISA QX: {lisa_q_x}") + pw.dlog(f"LISA QY: {lisa_q_y}") + pw.dlog(f"LISA QZ: {lisa_q_z}") + current_idx += fmt_len + is_trusworthy = hk_data[current_idx] + pw.dlog(f"Trustworthy solution: {is_trusworthy}") + current_idx += 1 + stable_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4]) + pw.dlog(f"Stable count: {stable_count}") + current_idx += 4 + solution_strategy = hk_data[current_idx] + pw.dlog(f"Solution strategy: {solution_strategy}") + pw.printer.print_validity_buffer(hk_data[current_idx:], 23) + + @tmtc_definitions_provider def add_str_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry()