diff --git a/eive_tmtc/pus_tm/action_reply_handler.py b/eive_tmtc/pus_tm/action_reply_handler.py index 661812a..49981bf 100644 --- a/eive_tmtc/pus_tm/action_reply_handler.py +++ b/eive_tmtc/pus_tm/action_reply_handler.py @@ -16,7 +16,7 @@ from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.tmtc.core import handle_core_ctrl_action_replies from eive_tmtc.tmtc.payload.ploc_mpsoc import handle_mpsoc_data_reply from eive_tmtc.tmtc.payload.ploc_supervisor import SupvActionId -from eive_tmtc.tmtc.acs.star_tracker import StarTrackerActionId +from eive_tmtc.tmtc.acs.star_tracker import handle_star_tracker_action_replies from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply from tmtccmd.tm import Service8FsfwTm from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter @@ -52,7 +52,7 @@ def handle_action_reply( elif object_id.as_bytes == CORE_CONTROLLER_ID: return handle_core_ctrl_action_replies(action_id, pw, custom_data) elif object_id.as_bytes == STAR_TRACKER_ID: - return handle_startracker_replies(action_id, pw, custom_data) + return handle_star_tracker_action_replies(action_id, pw, custom_data) elif object_id.as_bytes in [ ACU_HANDLER_ID, PDU_1_HANDLER_ID, @@ -89,20 +89,3 @@ def handle_supervisor_replies(action_id: int, pw: PrintWrapper, custom_data: byt content_list = [struct.unpack("!H", custom_data[:2])[0]] pw.dlog(f"{header_list}") pw.dlog(f"{content_list}") - - -def handle_startracker_replies( - action_id: int, pw: PrintWrapper, custom_data: bytearray -): - if action_id == StarTrackerActionId.CHECKSUM: - if len(custom_data) != 5: - _LOGGER.warning( - "Star tracker reply has invalid length {0}".format(len(custom_data)) - ) - return - header_list = ["Checksum", "Checksum valid"] - print(custom_data[4]) - checksum_valid_flag = custom_data[4] >> 8 - content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] - pw.dlog(f"{header_list}") - pw.dlog(f"{content_list}") diff --git a/eive_tmtc/pus_tm/hk_handler.py b/eive_tmtc/pus_tm/hk_handler.py index 02b3b4e..ac40d80 100644 --- a/eive_tmtc/pus_tm/hk_handler.py +++ b/eive_tmtc/pus_tm/hk_handler.py @@ -1,5 +1,7 @@ """HK Handling for EIVE OBSW""" +import dataclasses import logging +from typing import List from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data from eive_tmtc.tmtc.internal_err_reporter import handle_ier_hk_data @@ -46,8 +48,18 @@ _LOGGER = logging.getLogger(__name__) FORWARD_SENSOR_TEMPS = False +@dataclasses.dataclass +class HkFilter: + object_ids: List[ObjectIdU32] + set_ids: List[int] + + def handle_hk_packet( - raw_tm: bytes, obj_id_dict: ObjectIdDictT, printer: FsfwTmTcPrinter, hk_level: int + raw_tm: bytes, + obj_id_dict: ObjectIdDictT, + printer: FsfwTmTcPrinter, + hk_filter: HkFilter, + hk_level: int, ): tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) @@ -56,17 +68,21 @@ def handle_hk_packet( if tm_packet.subservice == 25 or tm_packet.subservice == 26: hk_data = tm_packet.tm_data[8:] - printer.generic_hk_tm_print( - content_type=HkContentType.HK, - object_id=named_obj_id, - set_id=tm_packet.set_id, - hk_data=hk_data, - ) - + if named_obj_id in hk_filter.object_ids: + handle_regular_hk_print( + printer=printer, + object_id=named_obj_id, + hk_packet=tm_packet, + tm=tm_packet.pus_tm, + hk_data=hk_data, + ) + return try: - if hk_level == 1: - pass - elif hk_level > 1: + if hk_level >= 1: + printer.generic_hk_tm_print( + HkContentType.HK, named_obj_id, tm_packet.set_id, hk_data + ) + if hk_level >= 1: handle_regular_hk_print( printer=printer, object_id=named_obj_id, diff --git a/eive_tmtc/pus_tm/pus_demux.py b/eive_tmtc/pus_tm/pus_demux.py index 3495a0f..f178990 100644 --- a/eive_tmtc/pus_tm/pus_demux.py +++ b/eive_tmtc/pus_tm/pus_demux.py @@ -18,7 +18,7 @@ from .defs import PrintWrapper from .event_handler import handle_event_packet from .verification_handler import handle_service_1_fsfw_packet, generic_retval_printout -from .hk_handler import handle_hk_packet +from .hk_handler import handle_hk_packet, HkFilter from .action_reply_handler import handle_action_reply _LOGGER = logging.getLogger(__name__) @@ -30,6 +30,7 @@ def pus_factory_hook( # noqa C901 : Complexity okay here printer: FsfwTmTcPrinter, raw_logger: RawTmtcTimedLogWrapper, hk_level: int, + hk_filter: HkFilter, ): if len(packet) < 8: _LOGGER.warning("Detected packet shorter than 8 bytes!") @@ -50,7 +51,11 @@ def pus_factory_hook( # noqa C901 : Complexity okay here handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet) elif service == 3: handle_hk_packet( - printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level + printer=printer, + raw_tm=packet, + obj_id_dict=obj_id_dict, + hk_level=hk_level, + hk_filter=hk_filter, ) elif service == 5: handle_event_packet(raw_tm=packet, pw=pw) diff --git a/eive_tmtc/tmtc/acs/star_tracker.py b/eive_tmtc/tmtc/acs/star_tracker.py index 016f7fc..ae1790c 100644 --- a/eive_tmtc/tmtc/acs/star_tracker.py +++ b/eive_tmtc/tmtc/acs/star_tracker.py @@ -16,7 +16,14 @@ from eive_tmtc.utility.input_helper import InputHelper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config.tmtc import tmtc_definitions_provider -from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_diag_command, make_sid +from tmtccmd.tc.pus_3_fsfw_hk import ( + create_request_one_diag_command, + create_request_one_hk_command, + enable_periodic_hk_command_with_interval, + disable_periodic_hk_command, + make_sid, +) +from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode from tmtccmd.util import ObjectIdU32 @@ -90,6 +97,9 @@ class StarTrackerActionId(enum.IntEnum): DEBUG_CAMERA = 83 FIRMWARE_UPDATE = 84 SET_TIME_FROM_SYS_TIME = 87 + ADD_SECONDARY_TM_TO_NORMAL_MODE = 95 + RESET_SECONDARY_TM_SET = 96 + READ_SECONDARY_TM_SET = 97 class OpCodes: @@ -98,8 +108,12 @@ class OpCodes: NORMAL = "nml" OFF = "off" PING = "ping" - REQUEST_SOLUTION_SET_HK = "hk_req_sol" - REQUEST_SOLUTION_SET_ACTION = "action_req_sol" + ONE_SHOOT_HK = "one_shot_hk" + ENABLE_HK = "enable_hk" + DISABLE_HK = "disable_hk" + ADD_SECONDARY_TM_TO_NORMAL_MODE = "add_secondary_tm" + RESET_SECONDARY_TM_SET = "reset_secondary_tm" + READ_SECONDARY_TM_SET = "read_secondary_tm" TAKE_IMAGE = "take_image" UPLOAD_IMAGE = "upload_image" SET_IMG_PROCESSOR_MODE = "set_img_proc_mode" @@ -108,8 +122,12 @@ class OpCodes: class Info: - REQUEST_SOLUTION_SET_HK = "Request Solution Set HK once" - REQUEST_SOLUTION_SET_ACTION = "Request Solution Set Action" + ONE_SHOOT_HK = "One shoot HK Set" + ENABLE_HK = "Enable Periodic HK" + DISABLE_HK = "Disable Periodic HK" + ADD_SECONDARY_TM_TO_NORMAL_MODE = "Add specific Dataset to secondary TM" + RESET_SECONDARY_TM_SET = "Reset secondary TM to Temperature Set only" + READ_SECONDARY_TM_SET = "Read list of secondary TM Sets" UPLOAD_IMAGE = "Upload Image" TAKE_IMAGE = "Take Image" SET_IMG_PROCESSOR_MODE = "Set Image Processor Mode" @@ -124,11 +142,24 @@ class SetId(enum.IntEnum): TEMPERATURE = 25 SOLUTION = 24 HISTOGRAM = 28 + CONTRAST = 29 CHECKSUM = 50 CAMERA = 67 LIMITS = 68 CENTROIDING = 72 LISA = 73 + AUTO_BLOB = 89 + MATCHED_CENTROIDS = 90 + BLOB = 91 + BLOBS = 92 + CENTROID = 93 + CENTROIDS = 94 + + +class DataSetRequest(enum.IntEnum): + ONESHOT = 0 + ENABLE = 1 + DISABLE = 2 class FileDefs: @@ -227,13 +258,15 @@ def pack_star_tracker_commands( # noqa C901 q.add_log_cmd("Star tracker: Mode Off") data = pack_mode_data(prompt_object_id_mode_cmd(), Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) - if op_code == OpCodes.REQUEST_SOLUTION_SET_HK: - q.add_log_cmd(Info.REQUEST_SOLUTION_SET_HK) - q.add_pus_tc(create_request_one_diag_command(make_sid(obyt, SetId.SOLUTION))) - if op_code == OpCodes.REQUEST_SOLUTION_SET_ACTION: - q.add_log_cmd("Star tracker: Request solution") - data = obyt + struct.pack("!I", StarTrackerActionId.REQ_SOLUTION) - q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) + if op_code == OpCodes.ONE_SHOOT_HK: + q.add_log_cmd(Info.ONE_SHOOT_HK) + request_dataset(q, DataSetRequest.ONESHOT) + if op_code == OpCodes.ENABLE_HK: + q.add_log_cmd(Info.ENABLE_HK) + request_dataset(q, DataSetRequest.ENABLE) + if op_code == OpCodes.DISABLE_HK: + q.add_log_cmd(Info.DISABLE_HK) + request_dataset(q, DataSetRequest.DISABLE) if op_code == "4": q.add_log_cmd("Star tracker: Mode Raw") data = pack_mode_data(obyt, Mode.RAW, 0) @@ -613,6 +646,79 @@ def pack_star_tracker_commands( # noqa C901 + firmware.encode() ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) + if op_code == OpCodes.ADD_SECONDARY_TM_TO_NORMAL_MODE: + q.add_log_cmd(Info.ADD_SECONDARY_TM_TO_NORMAL_MODE) + for val in SetId: + print("{:<2}: {:<20}".format(val, val.name)) + set_id = int(input("Specify the dataset \n" "")) + q.add_pus_tc( + create_action_cmd( + STAR_TRACKER_ID, + StarTrackerActionId.ADD_SECONDARY_TM_TO_NORMAL_MODE, + struct.pack("!I", set_id), + ) + ) + if op_code == OpCodes.RESET_SECONDARY_TM_SET: + q.add_log_cmd(Info.RESET_SECONDARY_TM_SET) + q.add_pus_tc( + create_action_cmd( + STAR_TRACKER_ID, + StarTrackerActionId.RESET_SECONDARY_TM_SET, + ) + ) + if op_code == OpCodes.READ_SECONDARY_TM_SET: + q.add_log_cmd(Info.READ_SECONDARY_TM_SET) + q.add_pus_tc( + create_action_cmd( + STAR_TRACKER_ID, StarTrackerActionId.READ_SECONDARY_TM_SET + ) + ) + + +def request_dataset(q: DefaultPusQueueHelper, req_type: DataSetRequest): + for val in SetId: + print("{:<2}: {:<20}".format(val, val.name)) + set_id = int(input("Specify the dataset \n" "")) + if set_id in [SetId.SOLUTION, SetId.TEMPERATURE]: + is_diag = True + else: + is_diag = False + match req_type: + case DataSetRequest.ONESHOT: + if is_diag: + q.add_pus_tc( + create_request_one_diag_command(make_sid(STAR_TRACKER_ID, set_id)) + ) + else: + q.add_pus_tc( + create_request_one_hk_command(make_sid(STAR_TRACKER_ID, set_id)) + ) + case DataSetRequest.ENABLE: + interval = float( + input("Please specify interval in floating point seconds: ") + ) + + if is_diag: + cmd_tuple = enable_periodic_hk_command_with_interval( + True, make_sid(STAR_TRACKER_ID, set_id), interval + ) + else: + cmd_tuple = enable_periodic_hk_command_with_interval( + False, make_sid(STAR_TRACKER_ID, set_id), interval + ) + q.add_pus_tc(cmd_tuple[0]) + q.add_pus_tc(cmd_tuple[1]) + case DataSetRequest.DISABLE: + if is_diag: + q.add_pus_tc( + disable_periodic_hk_command(True, make_sid(STAR_TRACKER_ID, set_id)) + ) + else: + q.add_pus_tc( + disable_periodic_hk_command( + False, make_sid(STAR_TRACKER_ID, set_id) + ) + ) def pack_read_command(object_id: bytes) -> bytearray: @@ -680,8 +786,24 @@ def handle_str_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper): pw.dlog(f"Received STR HK set with set ID {set_id}") if set_id == SetId.SOLUTION: handle_solution_set(hk_data, pw) + elif set_id == SetId.HISTOGRAM: + handle_histogram_set(hk_data, pw) elif set_id == SetId.TEMPERATURE: handle_temperature_set(hk_data, pw) + elif set_id == SetId.AUTO_BLOB: + handle_auto_blob_set(hk_data, pw) + elif set_id == SetId.MATCHED_CENTROIDS: + handle_matched_centroids_set(hk_data, pw) + elif set_id == SetId.BLOB: + handle_blob_set(hk_data, pw) + elif set_id == SetId.BLOBS: + handle_blobs_set(hk_data, pw) + elif set_id == SetId.CENTROID: + handle_centroid_set(hk_data, pw) + elif set_id == SetId.CENTROIDS: + handle_centroids_set(hk_data, pw) + elif set_id == SetId.CONTRAST: + handle_contrast_set(hk_data, pw) else: _LOGGER.warning(f"HK parsing for Star Tracker set ID {set_id} unimplemented") @@ -692,11 +814,14 @@ def unpack_time_hk(hk_data: bytes, current_idx: int, pw: PrintWrapper) -> int: (ticks, unix_time) = struct.unpack( ticks_time_fmt, hk_data[current_idx : current_idx + fmt_len] ) - unix_as_dt = datetime.datetime.fromtimestamp( - int(round(unix_time / 1e6)), tz=datetime.timezone.utc - ) - pw.dlog(f"Ticks: {ticks} | UNIX time: {unix_time}") - pw.dlog(f"UNIX as datetime: {unix_as_dt}") + try: + unix_as_dt = datetime.datetime.fromtimestamp( + int(round(unix_time / 1e6)), tz=datetime.timezone.utc + ) + pw.dlog(f"Ticks: {ticks} | UNIX time: {unix_time}") + pw.dlog(f"UNIX as datetime: {unix_as_dt}") + except ValueError as e: + _LOGGER.exception(e) current_idx += fmt_len return current_idx @@ -792,6 +917,234 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper): FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23) +def handle_blob_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Blob Set") + if len(hk_data) < 14: + _LOGGER.warning(f"Blob dataset HK data with length {len(hk_data)} too short") + return + current_idx = unpack_time_hk(hk_data, 0, pw) + blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] + pw.dlog(f"Blob count: {blob_count}") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx + 4 :], num_vars=3) + + +def handle_blobs_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Blobs Set") + if len(hk_data) < 6 + 2 * 2 * 8: + _LOGGER.warning(f"Blobs dataset HK data with length {len(hk_data)} too short") + return + current_idx = unpack_time_hk(hk_data, 0, pw) + fmt_str = "!HHH" + inc_len = struct.calcsize(fmt_str) + count, count_used, nr_4lines_skipped = struct.unpack( + fmt_str, hk_data[current_idx : current_idx + inc_len] + ) + current_idx += inc_len + pw.dlog( + f"Count {count} | Count Used {count_used} | Number of skipped 4lines {nr_4lines_skipped}" + ) + fmt_coords = "!HHHHHHHH" + inc_len = struct.calcsize(fmt_coords) + x_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + pw.dlog("{:<8} {:<8}".format("X", "Y")) + for idx in range(8): + pw.dlog("{:<8} {:<8}".format(x_coords[idx], y_coords[idx])) + assert current_idx == len(hk_data) - 1 + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=7) + + +def handle_centroid_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Centroid Set") + if len(hk_data) < 14: + raise ValueError( + f"Centroid dataset HK data with length {len(hk_data)} too short" + ) + current_idx = unpack_time_hk(hk_data, 0, pw) + centroid_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] + current_idx += 4 + pw.dlog(f"Centroid count: {centroid_count}") + assert current_idx == len(hk_data) - 1 + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) + + +def handle_centroids_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Centroids Set") + current_idx = unpack_time_hk(hk_data, 0, pw) + centroids_count = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] + current_idx += 2 + pw.dlog(f"Centroids count: {centroids_count}") + fmt_coords = "!ffffffffffffffff" + inc_len = struct.calcsize(fmt_coords) + x_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + fmt_coords = "!BBBBBBBBBBBBBBBB" + inc_len = struct.calcsize(fmt_coords) + magnitude = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + pw.dlog("{:<8} {:<8} {:<8} {:<8}".format("Index", "X", "Y", "Magnitude")) + for idx in range(16): + pw.dlog( + "{:<8} {:<8.3f} {:<8.3f} {:<8}".format( + idx, x_coords[idx], y_coords[idx], magnitude[idx] + ) + ) + assert current_idx == len(hk_data) - 1 + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6) + + +def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Matched Centroids Set") + if len(hk_data) < 4 + 8 + 1 + 4 * 16 * 5: + raise ValueError( + f"Matched Centroids dataset HK data with length {len(hk_data)} too short. Expected 333 bytes." + ) + current_idx = unpack_time_hk(hk_data, 0, pw) + num_matched_centroids = hk_data[current_idx] + current_idx += 1 + pw.dlog(f"Number of matched centroids {num_matched_centroids}") + fmt_ids = "!IIIIIIIIIIIIIIII" + inc_len = struct.calcsize(fmt_ids) + star_id = struct.unpack(fmt_ids, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + fmt_floats = "!ffffffffffffffff" + inc_len = struct.calcsize(fmt_floats) + x_coords = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_coords = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + x_errors = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + y_errors = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) + current_idx += inc_len + pw.dlog( + "{:<8} {:<10} {:<10} {:<10} {:<10} {:<10}".format( + "Index", "Star ID", "X", "Y", "X Error", "Y Error" + ) + ) + for idx in range(16): + pw.dlog( + "{:<8} {:<10} {:<10.3f} {:<10.3f} {:<10.3f} {:<10.3f}".format( + idx, + star_id[idx], + x_coords[idx], + y_coords[idx], + x_errors[idx], + y_errors[idx], + ) + ) + assert current_idx == len(hk_data) - 1 + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8) + + +def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper): + pw.dlog("Received Auto Blob Set") + if len(hk_data) < 4 + 8 + 4: + raise ValueError( + f"Matched Centroids dataset HK data with length {len(hk_data)} too short. Expected 16 bytes." + ) + current_idx = unpack_time_hk(hk_data, 0, pw) + fmt_threshold = "!f" + inc_len = struct.calcsize(fmt_threshold) + threshold = struct.unpack( + fmt_threshold, hk_data[current_idx : current_idx + inc_len] + )[0] + current_idx += inc_len + assert current_idx == len(hk_data) - 1 + pw.dlog(f"Threshold {threshold}") + FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) + + +def handle_histo_or_contrast_set(name: str, hk_data: bytes, pw: PrintWrapper): + pw.dlog(f"Received {name} Set") + current_idx = unpack_time_hk(hk_data, 0, pw) + fmt_str = "!IIIIIIIII" + bins_list = [] + inc_len = struct.calcsize(fmt_str) + a_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + bins_list.append(a_bins) + current_idx += inc_len + b_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + bins_list.append(b_bins) + current_idx += inc_len + c_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + bins_list.append(c_bins) + current_idx += inc_len + d_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) + bins_list.append(d_bins) + pw.dlog( + f"{name} Sections: A Upper Left | B Upper Right | C Lower Left | D Lower Right" + ) + pw.dlog("{:<12} {:<10} {:<10} {:<10} {:<10}".format("Range", "A", "B", "C", "D")) + for idx in range(9): + if idx == 0: + val_range = "0 (0-0)" + elif idx == 1: + val_range = "1 (1-1)" + else: + val_range = f"{idx} ({pow(2, idx - 1)}-{pow(2, idx) - 1})" + pw.dlog( + "{:<12} {:<10} {:<10} {:<10} {:<10}".format( + val_range, + bins_list[0][idx], + bins_list[1][idx], + bins_list[2][idx], + bins_list[3][idx], + ) + ) + + +def handle_histogram_set(hk_data: bytes, pw: PrintWrapper): + handle_histo_or_contrast_set("Histogram", hk_data, pw) + + +def handle_contrast_set(hk_data: bytes, pw: PrintWrapper): + handle_histo_or_contrast_set("Contrast", hk_data, pw) + + +def handle_star_tracker_action_replies( + action_id: int, pw: PrintWrapper, custom_data: bytes +): + if action_id == StarTrackerActionId.CHECKSUM: + handle_checksum(pw, custom_data) + elif action_id == StarTrackerActionId.READ_SECONDARY_TM_SET: + handle_read_secondary_tm_set(pw, custom_data) + + +def handle_checksum(pw: PrintWrapper, custom_data: bytes): + if len(custom_data) != 5: + _LOGGER.warning( + "Star tracker reply has invalid length {0}".format(len(custom_data)) + ) + return + header_list = ["Checksum", "Checksum valid"] + print(custom_data[4]) + checksum_valid_flag = custom_data[4] >> 8 + content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] + pw.dlog(f"{header_list}") + pw.dlog(f"{content_list}") + + +def handle_read_secondary_tm_set(pw: PrintWrapper, custom_data: bytes): + pw.dlog("Received secondary TM Sets") + if len(custom_data) % 4 != 0: + raise ValueError(f"Received data of unexpected length {len(custom_data)}") + data_length = int(len(custom_data) / 4) + fmt_str = "!" + "I" * data_length + inc_len = struct.calcsize(fmt_str) + set_ids = struct.unpack(fmt_str, custom_data[:inc_len]) + pw.dlog("The following Datasets are currently Part of the secondary TM list") + for set_id in set_ids: + if set_id in SetId._value2member_map_: + pw.dlog(SetId(set_id).name) + else: + pw.dlog(f"Unknown Set ID {set_id}") + + @tmtc_definitions_provider def add_str_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() @@ -799,62 +1152,65 @@ def add_str_cmds(defs: TmtcDefinitionWrapper): oce.add(OpCodes.ON_FIRMWARE, "Mode On, Submode Firmware") oce.add(OpCodes.NORMAL, "Mode Normal") oce.add(OpCodes.OFF, "Mode Off") - # oce.add("4", "Star Tracker: Mode Raw") oce.add(OpCodes.PING, "Star Tracker: Ping") oce.add(OpCodes.TAKE_IMAGE, "Take Image") - oce.add(OpCodes.REQUEST_SOLUTION_SET_HK, Info.REQUEST_SOLUTION_SET_HK) - oce.add(OpCodes.REQUEST_SOLUTION_SET_ACTION, Info.REQUEST_SOLUTION_SET_ACTION) + oce.add(OpCodes.ONE_SHOOT_HK, Info.ONE_SHOOT_HK) + oce.add(OpCodes.ENABLE_HK, Info.ENABLE_HK) + oce.add(OpCodes.DISABLE_HK, Info.DISABLE_HK) oce.add(OpCodes.UPLOAD_IMAGE, Info.UPLOAD_IMAGE) oce.add(OpCodes.SET_IMG_PROCESSOR_MODE, Info.SET_IMG_PROCESSOR_MODE) - oce.add("6", "Star Tracker: Switch to bootloader program") - oce.add("7", "Star Tracker: Request temperature") - oce.add("8", "Star Tracker: Request version") - oce.add("9", "Star Tracker: Request interface") - oce.add("10", "Star Tracker: Request power") - oce.add("11", "Star Tracker: Set subscription parameters") - oce.add("12", "Star Tracker: Boot image (requires bootloader mode)") - oce.add("13", "Star Tracker: Request time") - oce.add("14", "Star Tracker: Request solution") - oce.add("16", "Star Tracker: Download image") - oce.add("17", "Star Tracker: Set limit parameters") - oce.add("17", "Star Tracker: Set limit parameters") - oce.add("18", "Star Tracker: Set tracking parameters") - oce.add("19", "Star Tracker: Set mounting parameters") - oce.add("20", "Star Tracker: Set camera parameters") - oce.add("22", "Star Tracker: Set centroiding parameters") - oce.add("23", "Star Tracker: Set LISA parameters") - oce.add("24", "Star Tracker: Set matching parameters") - oce.add("25", "Star Tracker: Set validation parameters") - oce.add("26", "Star Tracker: Set algo parameters") - oce.add("28", "Star Tracker: Stop str helper") - oce.add("30", "Star Tracker: Set name of download image") - oce.add("31", "Star Tracker: Request histogram") - oce.add("32", "Star Tracker: Request contrast") - oce.add("33", "Star Tracker: Set json filename") - oce.add("35", "Star Tracker: Flash read") - oce.add("36", "Star Tracker: Set flash read filename") - oce.add("37", "Star Tracker: Get checksum") - oce.add("49", "Star Tracker: Request camera parameter") - oce.add("50", "Star Tracker: Request limits") - oce.add("52", "Star Tracker: (EGSE only) Load camera ground config") - oce.add("53", "Star Tracker: (EGSE only) Load camera flight config") - oce.add("54", "Star Tracker: Request log level parameters") - oce.add("55", "Star Tracker: Request mounting parameters") - oce.add("56", "Star Tracker: Request image processor parameters") - oce.add("57", "Star Tracker: Request centroiding parameters") - oce.add("58", "Star Tracker: Request lisa parameters") - oce.add("59", "Star Tracker: Request matching parameters") - oce.add("60", "Star Tracker: Request tracking parameters") - oce.add("61", "Star Tracker: Request validation parameters") - oce.add("62", "Star Tracker: Request algo parameters") - oce.add("63", "Star Tracker: Request subscription parameters") - oce.add("64", "Star Tracker: Request log subscription parameters") - oce.add("65", "Star Tracker: Request debug camera parameters") - oce.add("66", "Star Tracker: Set log level parameters") - oce.add("67", "Star Tracker: Set log subscription parameters") - oce.add("68", "Star Tracker: Set debug camera parameters") + oce.add( + OpCodes.ADD_SECONDARY_TM_TO_NORMAL_MODE, Info.ADD_SECONDARY_TM_TO_NORMAL_MODE + ) + oce.add(OpCodes.READ_SECONDARY_TM_SET, Info.READ_SECONDARY_TM_SET) + oce.add(OpCodes.RESET_SECONDARY_TM_SET, Info.RESET_SECONDARY_TM_SET) + # oce.add("6", "Star Tracker: Switch to bootloader program") + # oce.add("7", "Star Tracker: Request temperature") + # oce.add("8", "Star Tracker: Request version") + # oce.add("9", "Star Tracker: Request interface") + # oce.add("10", "Star Tracker: Request power") + # oce.add("11", "Star Tracker: Set subscription parameters") + # oce.add("12", "Star Tracker: Boot image (requires bootloader mode)") + # oce.add("13", "Star Tracker: Request time") + # oce.add("14", "Star Tracker: Request solution") + # oce.add("16", "Star Tracker: Download image") + # oce.add("17", "Star Tracker: Set limit parameters") + # oce.add("17", "Star Tracker: Set limit parameters") + # oce.add("18", "Star Tracker: Set tracking parameters") + # oce.add("19", "Star Tracker: Set mounting parameters") + # oce.add("20", "Star Tracker: Set camera parameters") + # oce.add("22", "Star Tracker: Set centroiding parameters") + # oce.add("23", "Star Tracker: Set LISA parameters") + # oce.add("24", "Star Tracker: Set matching parameters") + # oce.add("25", "Star Tracker: Set validation parameters") + # oce.add("26", "Star Tracker: Set algo parameters") + # oce.add("28", "Star Tracker: Stop str helper") + # oce.add("30", "Star Tracker: Set name of download image") + # oce.add("31", "Star Tracker: Request histogram") + # oce.add("32", "Star Tracker: Request contrast") + # oce.add("33", "Star Tracker: Set json filename") + # oce.add("35", "Star Tracker: Flash read") + # oce.add("36", "Star Tracker: Set flash read filename") + # oce.add("37", "Star Tracker: Get checksum") + # oce.add("49", "Star Tracker: Request camera parameter") + # oce.add("50", "Star Tracker: Request limits") + # oce.add("52", "Star Tracker: (EGSE only) Load camera ground config") + # oce.add("53", "Star Tracker: (EGSE only) Load camera flight config") + # oce.add("54", "Star Tracker: Request log level parameters") + # oce.add("55", "Star Tracker: Request mounting parameters") + # oce.add("56", "Star Tracker: Request image processor parameters") + # oce.add("57", "Star Tracker: Request centroiding parameters") + # oce.add("58", "Star Tracker: Request lisa parameters") + # oce.add("59", "Star Tracker: Request matching parameters") + # oce.add("60", "Star Tracker: Request tracking parameters") + # oce.add("61", "Star Tracker: Request validation parameters") + # oce.add("62", "Star Tracker: Request algo parameters") + # oce.add("63", "Star Tracker: Request subscription parameters") + # oce.add("64", "Star Tracker: Request log subscription parameters") + # oce.add("65", "Star Tracker: Request debug camera parameters") + # oce.add("66", "Star Tracker: Set log level parameters") + # oce.add("67", "Star Tracker: Set log subscription parameters") + # oce.add("68", "Star Tracker: Set debug camera parameters") oce.add(OpCodes.FW_UPDATE, Info.FW_UPDATE) - oce.add("70", "Star Tracker: Disable timestamp generation") - oce.add("71", "Star Tracker: Enable timestamp generation") oce.add(OpCodes.SET_TIME_FROM_SYS_TIME, Info.SET_TIME_FROM_SYS_TIME) defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce) diff --git a/tmtcc.py b/tmtcc.py index ea2afa4..44eaede 100755 --- a/tmtcc.py +++ b/tmtcc.py @@ -15,7 +15,9 @@ from spacepackets.cfdp import ( TransmissionMode, ) +from eive_tmtc.config.object_ids import STAR_TRACKER_ID from eive_tmtc.pus_tc.tc_handler import TcHandler +from eive_tmtc.pus_tm.hk_handler import HkFilter from tmtccmd.logging import add_colorlog_console_logger from tmtccmd.cfdp.handler import CfdpInCcsdsHandler from tmtccmd.cfdp.mib import ( @@ -24,7 +26,7 @@ from tmtccmd.cfdp.mib import ( RemoteEntityCfg, ) from tmtccmd import BackendBase -from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider +from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider, ObjectIdU32 from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.logging.pus import ( @@ -77,12 +79,19 @@ class PusHandler(SpecificApidHandlerBase): self.verif_wrapper = wrapper self.raw_logger = raw_logger self.hk_level = hk_level + self.these_objs_hk_only = [] + self.hk_filter = HkFilter(object_ids=self.these_objs_hk_only, set_ids=[]) def handle_tm(self, packet: bytes, _user_args: any): # with open("tc.bin", "wb") as of: # of.write(packet) pus_factory_hook( - packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level + packet, + self.verif_wrapper, + self.printer, + self.raw_logger, + self.hk_level, + self.hk_filter, )