# -*- coding: utf-8 -*- """ @file star_tracker.py @brief Star tracker commanding @author J. Meier @date 14.08.2021 """ import datetime import enum import logging import struct from eive_tmtc.config.definitions import CustomServiceList from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.utility.input_helper import InputHelper from spacepackets.ecss.tc import PusTelecommand from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry from tmtccmd.config.tmtc import tmtc_definitions_provider from tmtccmd.pus.tc.s3_fsfw_hk import ( create_request_one_diag_command, create_request_one_hk_command, enable_periodic_hk_command_with_interval, disable_periodic_hk_command, make_sid, ) from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.util import ObjectIdU32 from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from eive_tmtc.config.object_ids import STR_ASSEMBLY, STAR_TRACKER_ID _LOGGER = logging.getLogger(__name__) class StarTrackerActionId(enum.IntEnum): PING = 0 BOOT = 1 REQ_VERSION = 2 REQ_INTERFACE = 3 REQ_TIME = 4 UNLOCK = 6 SWITCH_TO_BOOTLOADER_PROGRAM = 7 REQ_POWER = 11 TAKE_IMAGE = 15 DOWNLOAD_IMAGE = 9 UPLOAD_IMAGE = 10 DOWNLOAD_CENTROID = 16 UPLOAD_CENTROID = 17 SUBSCRIPTION = 18 IMAGE_PROCESSOR = 19 REQ_SOLUTION = 24 REQ_TEMPERATURE = 25 REQ_HISTOGRAM = 28 REQ_CONTRAST = 29 LIMITS = 40 MOUNTING = 41 CAMERA = 42 BLOB = 43 CENTROIDING = 44 LISA = 45 MATCHING = 46 TRACKING = 47 VALIDATION = 48 ALGO = 49 CHECKSUM = 50 FLASH_READ = 51 FLASH_WRITE = 52 DOWNLOAD_MATCHED_STAR = 53 STOP_STR_HELPER = 55 RESET_ERROR = 56 CHANGE_DOWNLOAD_IMAGE = 57 SET_JSON_FILE_NAME = 58 SET_FLASH_READ_FILENAME = 59 DOWNLOAD_DBIMAGE = 61 DOWNLOAD_BLOBPIXEL = 62 DOWNLOAD_FPGA_IMAGE = 63 CHANGE_FPGA_DOWNLOAD_FILE = 64 UPLOAD_FPGA_IMAGE = 65 FPGA_ACTION = 66 REQ_CAMERA_PARAMS = 67 REQ_LIMITS = 68 REQ_LOG_LEVEL = 69 REQ_MOUNTING = 70 REQ_IMAGE_PROCESSOR = 71 REQ_CENTROIDING = 72 REQ_LISA = 73 REQ_MATCHING = 74 REQ_TRACKING = 75 REQ_VALIDATION = 76 REQ_ALGO = 77 REQ_SUBSCRIPTION = 78 REQ_LOG_SUBSCRIPTION = 79 REQ_DEBUG_CAMERA = 80 LOGLEVEL = 81 LOG_SUBSCRIPTION = 82 DEBUG_CAMERA = 83 FIRMWARE_UPDATE = 84 SET_TIME_FROM_SYS_TIME = 87 ADD_SECONDARY_TM_TO_NORMAL_MODE = 95 RESET_SECONDARY_TM_SET = 96 READ_SECONDARY_TM_SET = 97 RELOAD_JSON_CFG_FILE = 100 class OpCode: ON_BOOTLOADER = "on_bootloader" ON_FIRMWARE = "on_firmware" NORMAL = "nml" OFF = "off" PING = "ping" ONE_SHOOT_HK = "one_shot_hk" ENABLE_HK = "enable_hk" DISABLE_HK = "disable_hk" ADD_SECONDARY_TM_TO_NORMAL_MODE = "add_secondary_tm" RESET_SECONDARY_TM_SET = "reset_secondary_tm" READ_SECONDARY_TM_SET = "read_secondary_tm" TAKE_IMAGE = "take_image" UPLOAD_IMAGE = "upload_image" DOWNLOAD_IMAGE = "download_image" SET_IMG_PROCESSOR_MODE = "set_img_proc_mode" FW_UPDATE = "fw_update" SET_TIME_FROM_SYS_TIME = "set_time" RELOAD_JSON_CFG_FILE = "reload_json_cfg" class Info: ONE_SHOOT_HK = "One shoot HK Set" ENABLE_HK = "Enable Periodic HK" DISABLE_HK = "Disable Periodic HK" ADD_SECONDARY_TM_TO_NORMAL_MODE = "Add specific Dataset to secondary TM" RESET_SECONDARY_TM_SET = "Reset secondary TM to Temperature Set only" READ_SECONDARY_TM_SET = "Read list of secondary TM Sets" UPLOAD_IMAGE = "Upload Optical Image" DOWNLOAD_IMAGE = "Download Optical Image" TAKE_IMAGE = "Take Image" SET_IMG_PROCESSOR_MODE = "Set Image Processor Mode" FW_UPDATE = "Firmware Update" SET_TIME_FROM_SYS_TIME = "Set time from system time" RELOAD_JSON_CFG_FILE = "Reload JSON configuration file. Reboot still required." class SetId(enum.IntEnum): VERSION = 2 INTERFACE = 3 POWER = 11 TEMPERATURE = 25 SOLUTION = 24 HISTOGRAM = 28 CONTRAST = 29 CHECKSUM = 50 CAMERA = 67 LIMITS = 68 CENTROIDING = 72 LISA = 73 AUTO_BLOB = 89 MATCHED_CENTROIDS = 90 BLOB = 91 BLOBS = 92 CENTROID = 93 CENTROIDS = 94 class DataSetRequest(enum.IntEnum): ONESHOT = 0 ENABLE = 1 DISABLE = 2 class FileDefs: download_path = "/mnt/sd0/startracker" json_file = "/mnt/sd0/startracker/full.json" egse_ground_config = "/home/pi/arcsec/json/ground-config.json" egse_flight_config = "/home/pi/arcsec/json/flight-config.json" egse_solution_upload_img_config = "/home/pi/arcsec/json/upload-image-solution.json" egse_histogram_upload_img_config = ( "/home/pi/arcsec/json/upload-image-histogram.json" ) q7s_ground_config = "/mnt/sd0/startracker/ground-config.json" q7s_flight_config = "/mnt/sd0/startracker/flight-config.json" firmware2_1 = "/home/pi/arcsec/firmware/sagitta-2-1.bin" firmware22_1 = "/home/pi/arcsec/firmware/sagitta-22-1.bin" firmware_origin = "/home/pi/arcsec/firmware/sagitta-origin.bin" FW_SLOT_Q7S = "/mnt/sd0/startracker/updates/sagitta-update.bin" json_dict = { "1": ("Q7S flight config", FileDefs.q7s_flight_config), "2": ("Q7S ground config", FileDefs.q7s_ground_config), "3": ("EGSE flight config", FileDefs.egse_flight_config), "4": ("EGSE ground config", FileDefs.egse_ground_config), "5": ( "EGSE get solution, upload image config", FileDefs.egse_solution_upload_img_config, ), "6": ( "EGSE get histogram, upload image config", FileDefs.egse_solution_upload_img_config, ), } FW_DICT = { "0": ("Firmware Update Q7S", FileDefs.FW_SLOT_Q7S), "1": ("Firmware Major = 2, Minor = 1", FileDefs.firmware2_1), "2": ("Firmware Major = 22, Minor = 1", FileDefs.firmware22_1), "3": ("Firmware Origin", FileDefs.firmware_origin), } UPLOAD_IMAGE_DICT = { "0": ("custom path", "Custom Path"), "1": ("q7s gemma", "/mnt/sd0/startracker/gemma.bin"), "2": ("egse gemma", "/home/pi/arcsec/star-images/gemma.bin"), "3": ("q7s polaris", "/mnt/sd0/startracker/polaris.bin"), "4": ("egse polaris", "/home/pi/arcsec/star-images/polaris.bin"), } class StartRegion: # Definition according to datasheet (which turned out to be partially faulty) BOOTLOADER = 0 STAR_TRACKER_FIRMWARE = 1 class PartitionSize: # Size of most recent firmware image STAR_TRACKER_FIRMWARE = 464572 class Submode(enum.IntEnum): DEFAULT = 0 BOOTLOADER = 1 FIRMWARE = 2 def prompt_object_id_mode_cmd() -> bytes: cmd_assy = input("Command Assembly (0) or Device Handler (1) ?: ") if cmd_assy == "0": return STR_ASSEMBLY else: return STAR_TRACKER_ID def pack_star_tracker_commands( # noqa C901 object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str ): q.add_log_cmd( f"Generate command for star tracker with object id: {object_id.as_hex_string}" ) obyt = object_id.as_bytes if op_code == OpCode.ON_BOOTLOADER: q.add_log_cmd("Star tracker: Mode On, Submode Bootloader") data = pack_mode_data(prompt_object_id_mode_cmd(), Mode.ON, Submode.BOOTLOADER) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.ON_FIRMWARE: q.add_log_cmd("Star tracker: Mode On, Submode Firmware") data = pack_mode_data(prompt_object_id_mode_cmd(), Mode.ON, Submode.FIRMWARE) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.NORMAL: q.add_log_cmd("Star tracker: Mode Normal") data = pack_mode_data(prompt_object_id_mode_cmd(), Mode.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.OFF: q.add_log_cmd("Star tracker: Mode Off") data = pack_mode_data(prompt_object_id_mode_cmd(), Mode.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.ONE_SHOOT_HK: q.add_log_cmd(Info.ONE_SHOOT_HK) request_dataset(q, DataSetRequest.ONESHOT) if op_code == OpCode.ENABLE_HK: q.add_log_cmd(Info.ENABLE_HK) request_dataset(q, DataSetRequest.ENABLE) if op_code == OpCode.DISABLE_HK: q.add_log_cmd(Info.DISABLE_HK) request_dataset(q, DataSetRequest.DISABLE) if op_code == "4": q.add_log_cmd("Star tracker: Mode Raw") data = pack_mode_data(obyt, Mode.RAW, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data)) if op_code == OpCode.PING: q.add_log_cmd("Star tracker: Ping") data = obyt + struct.pack("!I", StarTrackerActionId.PING) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "6": q.add_log_cmd("Star tracker: Switch to bootloader program") data = obyt + struct.pack( "!I", StarTrackerActionId.SWITCH_TO_BOOTLOADER_PROGRAM ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "7": q.add_log_cmd("Star tracker: Temperature request") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_TEMPERATURE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "8": q.add_log_cmd("Star tracker: Request version") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_VERSION) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "9": q.add_log_cmd("Star tracker: Request interface") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_INTERFACE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "10": q.add_log_cmd("Star tracker: Request power") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_POWER) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "11": q.add_log_cmd("Star tracker: Set subscription parameters") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.SUBSCRIPTION) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "12": q.add_log_cmd("Star tracker: Boot") data = obyt + struct.pack("!I", StarTrackerActionId.BOOT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "13": q.add_log_cmd("Star tracker: Request time") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_TIME) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.UPLOAD_IMAGE: q.add_log_cmd("Star tracker: Upload image") image = get_upload_image() data = ( obyt + struct.pack("!I", StarTrackerActionId.UPLOAD_IMAGE) + bytearray(image, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.DOWNLOAD_IMAGE: q.add_log_cmd(f"STR: {Info.DOWNLOAD_IMAGE}") path = input("Specify storage location (default - /mnt/sd0/startracker): ") if not path: path = FileDefs.download_path data = ( obyt + struct.pack("!I", StarTrackerActionId.DOWNLOAD_IMAGE) + bytearray(path, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "17": q.add_log_cmd("Star tracker: Set limits") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.LIMITS) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "18": q.add_log_cmd("Star tracker: Set tracking parameters") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.TRACKING) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "19": q.add_log_cmd("Star tracker: Mounting") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.MOUNTING) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "20": q.add_log_cmd("Star tracker: Camera") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.CAMERA) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "22": q.add_log_cmd("Star tracker: Centroiding") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.CENTROIDING) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "23": q.add_log_cmd("Star tracker: LISA") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.LISA) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "24": q.add_log_cmd("Star tracker: Matching") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.MATCHING) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "25": q.add_log_cmd("Star tracker: Validation") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.VALIDATION) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "26": q.add_log_cmd("Star tracker: Algo") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.ALGO) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.TAKE_IMAGE: q.add_log_cmd("Star tracker: Take image") actionid = int( input("Specify parameter ID (4: take image, 7: get histogram): ") ) data = ( obyt + struct.pack("!I", StarTrackerActionId.TAKE_IMAGE) + struct.pack("!B", actionid) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "28": q.add_log_cmd("Star tracker: Stop str helper") data = obyt + struct.pack("!I", StarTrackerActionId.STOP_STR_HELPER) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "30": q.add_log_cmd("Star tracker: Set name of download image") filename = input("Specify download image name: ") data = ( obyt + struct.pack("!I", StarTrackerActionId.CHANGE_DOWNLOAD_IMAGE) + bytearray(filename, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "31": q.add_log_cmd("Star tracker: Request histogram") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_HISTOGRAM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "32": q.add_log_cmd("Star tracker: Request contrast") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_CONTRAST) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "33": q.add_log_cmd("Star tracker: Set json filename") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.SET_JSON_FILE_NAME) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "35": q.add_log_cmd("Star tracker: Flash read") data = pack_read_command(obyt) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "36": q.add_log_cmd("Star tracker: Set flash read filename") filename = input("Specify filename: ") data = ( obyt + struct.pack("!I", StarTrackerActionId.SET_FLASH_READ_FILENAME) + bytearray(filename, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "37": q.add_log_cmd("Star tracker: Get checksum") data = pack_checksum_command(obyt) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.SET_TIME_FROM_SYS_TIME: q.add_log_cmd(Info.SET_TIME_FROM_SYS_TIME) data = obyt + struct.pack("!I", StarTrackerActionId.SET_TIME_FROM_SYS_TIME) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "39": q.add_log_cmd("Star tracker: Download Centroid") id = 0 data = ( obyt + struct.pack("!I", StarTrackerActionId.DOWNLOAD_CENTROID) + struct.pack("!B", id) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "41": q.add_log_cmd("Star tracker: Download matched star") id = 0 data = ( obyt + struct.pack("!I", StarTrackerActionId.DOWNLOAD_MATCHED_STAR) + struct.pack("!B", id) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "42": q.add_log_cmd("Star tracker: Download DB Image") id = 0 data = ( obyt + struct.pack("!I", StarTrackerActionId.DOWNLOAD_DBIMAGE) + struct.pack("!B", id) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "43": q.add_log_cmd("Star tracker: Download Blob Pixel") id = 0 type = 1 # 0 - normal, 1 - fast data = ( obyt + struct.pack("!I", StarTrackerActionId.DOWNLOAD_BLOBPIXEL) + struct.pack("!B", id) + struct.pack("!B", type) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "47": q.add_log_cmd("Star tracker: FPGA action") id = 3 data = ( obyt + struct.pack("!I", StarTrackerActionId.FPGA_ACTION) + struct.pack("!B", id) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "48": q.add_log_cmd("Star tracker: Unlock") data = obyt + struct.pack("!I", StarTrackerActionId.UNLOCK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "49": q.add_log_cmd("Star tracker: Request camera parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_CAMERA_PARAMS) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "50": q.add_log_cmd("Star tracker: Request limits") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_LIMITS) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.SET_IMG_PROCESSOR_MODE: q.add_log_cmd(Info.SET_IMG_PROCESSOR_MODE) json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.IMAGE_PROCESSOR) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "52": q.add_log_cmd("Star tracker: EGSE load ground config camera parameters") data = ( obyt + struct.pack("!I", StarTrackerActionId.CAMERA) + bytearray(FileDefs.egse_ground_config, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "53": q.add_log_cmd("Star tracker: EGSE load flight config camera parameters") data = ( obyt + struct.pack("!I", StarTrackerActionId.CAMERA) + bytearray(FileDefs.egse_flight_config, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "54": q.add_log_cmd("Star tracker: Request log level parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_LOG_LEVEL) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "55": q.add_log_cmd("Star tracker: Request mounting parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_MOUNTING) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "56": q.add_log_cmd("Star tracker: Request image processor parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_IMAGE_PROCESSOR) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "57": q.add_log_cmd("Star tracker: Request centroiding parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_CENTROIDING) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "58": q.add_log_cmd("Star tracker: Request lisa parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_LISA) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "59": q.add_log_cmd("Star tracker: Request matching parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_MATCHING) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "60": q.add_log_cmd("Star tracker: Request tracking parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_TRACKING) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "61": q.add_log_cmd("Star tracker: Request validation parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_VALIDATION) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "62": q.add_log_cmd("Star tracker: Request algo parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_ALGO) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "63": q.add_log_cmd("Star tracker: Request subscription parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_SUBSCRIPTION) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "64": q.add_log_cmd("Star tracker: Request log subscription parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_LOG_SUBSCRIPTION) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "65": q.add_log_cmd("Star tracker: Request debug camera parameters") data = obyt + struct.pack("!I", StarTrackerActionId.REQ_DEBUG_CAMERA) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "66": q.add_log_cmd("Star tracker: Set log level parameters") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.LOGLEVEL) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "67": q.add_log_cmd("Star tracker: Set log subscription parameters") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.LOG_SUBSCRIPTION) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == "68": q.add_log_cmd("Star tracker: Set debug camera parameters") json_file = get_config_file() data = ( obyt + struct.pack("!I", StarTrackerActionId.DEBUG_CAMERA) + bytearray(json_file, "utf-8") ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.FW_UPDATE: q.add_log_cmd(Info.FW_UPDATE) firmware = get_firmware() data = ( obyt + struct.pack("!I", StarTrackerActionId.FIRMWARE_UPDATE) + firmware.encode() ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data)) if op_code == OpCode.ADD_SECONDARY_TM_TO_NORMAL_MODE: q.add_log_cmd(Info.ADD_SECONDARY_TM_TO_NORMAL_MODE) for val in SetId: print("{:<2}: {:<20}".format(val, val.name)) set_id = int(input("Specify the dataset \n" "")) q.add_pus_tc( create_action_cmd( STAR_TRACKER_ID, StarTrackerActionId.ADD_SECONDARY_TM_TO_NORMAL_MODE, struct.pack("!I", set_id), ) ) if op_code == OpCode.RESET_SECONDARY_TM_SET: q.add_log_cmd(Info.RESET_SECONDARY_TM_SET) q.add_pus_tc( create_action_cmd( STAR_TRACKER_ID, StarTrackerActionId.RESET_SECONDARY_TM_SET, ) ) if op_code == OpCode.READ_SECONDARY_TM_SET: q.add_log_cmd(Info.READ_SECONDARY_TM_SET) q.add_pus_tc( create_action_cmd( STAR_TRACKER_ID, StarTrackerActionId.READ_SECONDARY_TM_SET ) ) if op_code == OpCode.RELOAD_JSON_CFG_FILE: q.add_log_cmd(Info.RELOAD_JSON_CFG_FILE) q.add_pus_tc( create_action_cmd(STAR_TRACKER_ID, StarTrackerActionId.RELOAD_JSON_CFG_FILE) ) def request_dataset(q: DefaultPusQueueHelper, req_type: DataSetRequest): for val in SetId: print("{:<2}: {:<20}".format(val, val.name)) set_id = int(input("Specify the dataset \n" "")) if set_id in [SetId.SOLUTION, SetId.TEMPERATURE]: is_diag = True else: is_diag = False match req_type: case DataSetRequest.ONESHOT: if is_diag: q.add_pus_tc( create_request_one_diag_command(make_sid(STAR_TRACKER_ID, set_id)) ) else: q.add_pus_tc( create_request_one_hk_command(make_sid(STAR_TRACKER_ID, set_id)) ) case DataSetRequest.ENABLE: interval = float( input("Please specify interval in floating point seconds: ") ) if is_diag: cmd_tuple = enable_periodic_hk_command_with_interval( True, make_sid(STAR_TRACKER_ID, set_id), interval ) else: cmd_tuple = enable_periodic_hk_command_with_interval( False, make_sid(STAR_TRACKER_ID, set_id), interval ) q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[1]) case DataSetRequest.DISABLE: if is_diag: q.add_pus_tc( disable_periodic_hk_command(True, make_sid(STAR_TRACKER_ID, set_id)) ) else: q.add_pus_tc( disable_periodic_hk_command( False, make_sid(STAR_TRACKER_ID, set_id) ) ) def pack_read_command(object_id: bytes) -> bytearray: start_region = StartRegion.STAR_TRACKER_FIRMWARE size = PartitionSize.STAR_TRACKER_FIRMWARE path = input("Specify storage location (default - /mnt/sd0/startracker): ") if not path: path = FileDefs.download_path data = ( object_id + struct.pack("!I", StarTrackerActionId.FLASH_READ) + struct.pack("!B", start_region) + struct.pack("!I", size) + bytearray(path, "utf-8") ) return bytearray(data) def pack_checksum_command(object_id: bytes) -> bytearray: start_region = StartRegion.STAR_TRACKER_FIRMWARE address = 0 size = PartitionSize.STAR_TRACKER_FIRMWARE data = ( object_id + struct.pack("!I", StarTrackerActionId.CHECKSUM) + struct.pack("!B", start_region) + struct.pack("!I", address) + struct.pack("!I", size) ) return bytearray(data) def get_config_file() -> str: _LOGGER.info("Specify json file") input_helper = InputHelper(json_dict) key = input_helper.get_key() json_file = json_dict[key][1] return json_file def get_firmware() -> str: _LOGGER.info("Specify firmware file") bin_select = int(input("Use hardcoded paths (0) or specify path manually (1) ?: ")) if bin_select == 0: input_helper = InputHelper(FW_DICT) key = input_helper.get_key() firmware = FW_DICT[key][1] else: firmware = input("Specify absolute path of the firmware update file: ") return firmware def get_upload_image() -> str: _LOGGER.info("Specify image to upload") input_helper = InputHelper(UPLOAD_IMAGE_DICT) key = input_helper.get_key() if UPLOAD_IMAGE_DICT[key][0] == "custom path": image = input("Please specify custom absolute path: ") else: image = UPLOAD_IMAGE_DICT[key][1] return image def handle_str_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper): pw.dlog(f"Received STR HK set with set ID {set_id}") if set_id == SetId.SOLUTION: handle_solution_set(hk_data, pw) elif set_id == SetId.HISTOGRAM: handle_histogram_set(hk_data, pw) elif set_id == SetId.TEMPERATURE: handle_temperature_set(hk_data, pw) elif set_id == SetId.AUTO_BLOB: handle_auto_blob_set(hk_data, pw) elif set_id == SetId.MATCHED_CENTROIDS: handle_matched_centroids_set(hk_data, pw) elif set_id == SetId.BLOB: handle_blob_set(hk_data, pw) elif set_id == SetId.BLOBS: handle_blobs_set(hk_data, pw) elif set_id == SetId.CENTROID: handle_centroid_set(hk_data, pw) elif set_id == SetId.CENTROIDS: handle_centroids_set(hk_data, pw) elif set_id == SetId.CONTRAST: handle_contrast_set(hk_data, pw) else: _LOGGER.warning(f"HK parsing for Star Tracker set ID {set_id} unimplemented") def unpack_time_hk(hk_data: bytes, current_idx: int, pw: PrintWrapper) -> int: ticks_time_fmt = "!IQ" fmt_len = struct.calcsize(ticks_time_fmt) (ticks, unix_time) = struct.unpack( ticks_time_fmt, hk_data[current_idx : current_idx + fmt_len] ) try: unix_as_dt = datetime.datetime.fromtimestamp( int(round(unix_time / 1e6)), tz=datetime.timezone.utc ) pw.dlog(f"Ticks: {ticks} | UNIX time: {unix_time}") pw.dlog(f"UNIX as datetime: {unix_as_dt}") except ValueError as e: _LOGGER.exception(e) current_idx += fmt_len return current_idx def handle_temperature_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received temperature set") if len(hk_data) < 24: _LOGGER.warning(f"Temperature dataset HK with length {len(hk_data)} too short") current_idx = unpack_time_hk(hk_data, 0, pw) temps_fmt = "!fff" fmt_len = struct.calcsize(temps_fmt) (mcu_temp, cmos_temp, fpga_temp) = struct.unpack( temps_fmt, hk_data[current_idx : current_idx + fmt_len] ) pw.dlog(f"MCU Temperature: {mcu_temp}") pw.dlog(f"CMOS Temperature: {cmos_temp}") pw.dlog(f"FPGA Temperature: {fpga_temp}") current_idx += fmt_len FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5) def handle_solution_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received solution set") if len(hk_data) < 78: _LOGGER.warning( f"Solution dataset HK data with length {len(hk_data)} too short" ) return current_idx = unpack_time_hk(hk_data, 0, pw) calib_quaternions_fmt = "!ffff" fmt_len = struct.calcsize(calib_quaternions_fmt) (calib_q_w, calib_q_x, calib_q_y, calib_q_z) = struct.unpack( calib_quaternions_fmt, hk_data[current_idx : current_idx + fmt_len] ) pw.dlog("Calibrated Quaternions") pw.dlog(f"Quaternion w: {calib_q_w}") pw.dlog(f"Quaternion x: {calib_q_x}") pw.dlog(f"Quaternion y: {calib_q_y}") pw.dlog(f"Quaternion z: {calib_q_z}") current_idx += fmt_len track_fmt = "!fffff" fmt_len = struct.calcsize(track_fmt) (track_confidence, track_q_w, track_q_x, track_q_y, track_q_z) = struct.unpack( track_fmt, hk_data[current_idx : current_idx + fmt_len] ) pw.dlog(f"Track Confidence: {track_confidence}") pw.dlog(f"Track QW: {track_q_w}") pw.dlog(f"Track QX: {track_q_x}") pw.dlog(f"Track QY: {track_q_y}") pw.dlog(f"Track QZ: {track_q_z}") current_idx += fmt_len track_removed = hk_data[current_idx] pw.dlog(f"Number of stars removed from tracking solution: {track_removed}") current_idx += 1 stars_centroided = hk_data[current_idx] pw.dlog(f"Centroided stars: {stars_centroided}") current_idx += 1 stars_matched_database = hk_data[current_idx] pw.dlog(f"Stars matched: {stars_matched_database}") current_idx += 1 # Result of LISA: Lost in space algorithm lisa_fmt = "!fffffB" fmt_len = struct.calcsize(lisa_fmt) ( lisa_q_w, lisa_q_x, lisa_q_y, lisa_q_z, lisa_percentage_close_stars, lisa_number_close_stars, ) = struct.unpack(lisa_fmt, hk_data[current_idx : current_idx + fmt_len]) pw.dlog(f"LISA QW: {lisa_q_w}") pw.dlog(f"LISA QX: {lisa_q_x}") pw.dlog(f"LISA QY: {lisa_q_y}") pw.dlog(f"LISA QZ: {lisa_q_z}") pw.dlog( f"Percentage of close stars in LISA solution: {lisa_percentage_close_stars}" ) pw.dlog(f"Number of close stars in LISA solution: {lisa_number_close_stars}") current_idx += fmt_len str_mode = hk_data[current_idx] pw.dlog(f"STR mode: {str_mode}") current_idx += 1 is_trustworthy = hk_data[current_idx] pw.dlog(f"Trustworthy solution: {is_trustworthy}") current_idx += 1 stable_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] pw.dlog(f"Stable count: {stable_count}") current_idx += 4 solution_strategy = hk_data[current_idx] pw.dlog(f"Solution strategy: {solution_strategy}") current_idx += 1 FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23) def handle_blob_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received Blob Set") if len(hk_data) < 14: _LOGGER.warning(f"Blob dataset HK data with length {len(hk_data)} too short") return current_idx = unpack_time_hk(hk_data, 0, pw) blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] pw.dlog(f"Blob count: {blob_count}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx + 4 :], num_vars=3) def handle_blobs_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received Blobs Set") if len(hk_data) < 6 + 2 * 2 * 8: _LOGGER.warning(f"Blobs dataset HK data with length {len(hk_data)} too short") return current_idx = unpack_time_hk(hk_data, 0, pw) fmt_str = "!HHH" inc_len = struct.calcsize(fmt_str) count, count_used, nr_4lines_skipped = struct.unpack( fmt_str, hk_data[current_idx : current_idx + inc_len] ) current_idx += inc_len pw.dlog( f"Count {count} | Count Used {count_used} | Number of skipped 4lines {nr_4lines_skipped}" ) fmt_coords = "!HHHHHHHH" inc_len = struct.calcsize(fmt_coords) x_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len y_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len pw.dlog("{:<8} {:<8}".format("X", "Y")) for idx in range(8): pw.dlog("{:<8} {:<8}".format(x_coords[idx], y_coords[idx])) assert current_idx == len(hk_data) - 1 FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=7) def handle_centroid_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received Centroid Set") if len(hk_data) < 14: raise ValueError( f"Centroid dataset HK data with length {len(hk_data)} too short" ) current_idx = unpack_time_hk(hk_data, 0, pw) centroid_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] current_idx += 4 pw.dlog(f"Centroid count: {centroid_count}") assert current_idx == len(hk_data) - 1 FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) def handle_centroids_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received Centroids Set") current_idx = unpack_time_hk(hk_data, 0, pw) centroids_count = struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] current_idx += 2 pw.dlog(f"Centroids count: {centroids_count}") fmt_coords = "!ffffffffffffffff" inc_len = struct.calcsize(fmt_coords) x_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len y_coords = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len fmt_coords = "!BBBBBBBBBBBBBBBB" inc_len = struct.calcsize(fmt_coords) magnitude = struct.unpack(fmt_coords, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len pw.dlog("{:<8} {:<8} {:<8} {:<8}".format("Index", "X", "Y", "Magnitude")) for idx in range(16): pw.dlog( "{:<8} {:<8.3f} {:<8.3f} {:<8}".format( idx, x_coords[idx], y_coords[idx], magnitude[idx] ) ) assert current_idx == len(hk_data) - 1 FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6) def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received Matched Centroids Set") if len(hk_data) < 4 + 8 + 1 + 4 * 16 * 5: raise ValueError( f"Matched Centroids dataset HK data with length {len(hk_data)} too short. Expected 333 bytes." ) current_idx = unpack_time_hk(hk_data, 0, pw) num_matched_centroids = hk_data[current_idx] current_idx += 1 pw.dlog(f"Number of matched centroids {num_matched_centroids}") fmt_ids = "!IIIIIIIIIIIIIIII" inc_len = struct.calcsize(fmt_ids) star_id = struct.unpack(fmt_ids, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len fmt_floats = "!ffffffffffffffff" inc_len = struct.calcsize(fmt_floats) x_coords = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len y_coords = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len x_errors = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len y_errors = struct.unpack(fmt_floats, hk_data[current_idx : current_idx + inc_len]) current_idx += inc_len pw.dlog( "{:<8} {:<10} {:<10} {:<10} {:<10} {:<10}".format( "Index", "Star ID", "X", "Y", "X Error", "Y Error" ) ) for idx in range(16): pw.dlog( "{:<8} {:<10} {:<10.3f} {:<10.3f} {:<10.3f} {:<10.3f}".format( idx, star_id[idx], x_coords[idx], y_coords[idx], x_errors[idx], y_errors[idx], ) ) assert current_idx == len(hk_data) - 1 FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8) def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper): pw.dlog("Received Auto Blob Set") if len(hk_data) < 4 + 8 + 4: raise ValueError( f"Matched Centroids dataset HK data with length {len(hk_data)} too short. Expected 16 bytes." ) current_idx = unpack_time_hk(hk_data, 0, pw) fmt_threshold = "!f" inc_len = struct.calcsize(fmt_threshold) threshold = struct.unpack( fmt_threshold, hk_data[current_idx : current_idx + inc_len] )[0] current_idx += inc_len assert current_idx == len(hk_data) - 1 pw.dlog(f"Threshold {threshold}") FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) def handle_histo_or_contrast_set(name: str, hk_data: bytes, pw: PrintWrapper): pw.dlog(f"Received {name} Set") current_idx = unpack_time_hk(hk_data, 0, pw) fmt_str = "!IIIIIIIII" bins_list = [] inc_len = struct.calcsize(fmt_str) a_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) bins_list.append(a_bins) current_idx += inc_len b_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) bins_list.append(b_bins) current_idx += inc_len c_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) bins_list.append(c_bins) current_idx += inc_len d_bins = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len]) bins_list.append(d_bins) pw.dlog( f"{name} Sections: A Upper Left | B Upper Right | C Lower Left | D Lower Right" ) pw.dlog("{:<12} {:<10} {:<10} {:<10} {:<10}".format("Range", "A", "B", "C", "D")) for idx in range(9): if idx == 0: val_range = "0 (0-0)" elif idx == 1: val_range = "1 (1-1)" else: val_range = f"{idx} ({pow(2, idx - 1)}-{pow(2, idx) - 1})" pw.dlog( "{:<12} {:<10} {:<10} {:<10} {:<10}".format( val_range, bins_list[0][idx], bins_list[1][idx], bins_list[2][idx], bins_list[3][idx], ) ) def handle_histogram_set(hk_data: bytes, pw: PrintWrapper): handle_histo_or_contrast_set("Histogram", hk_data, pw) def handle_contrast_set(hk_data: bytes, pw: PrintWrapper): handle_histo_or_contrast_set("Contrast", hk_data, pw) def handle_star_tracker_action_replies( action_id: int, pw: PrintWrapper, custom_data: bytes ): if action_id == StarTrackerActionId.CHECKSUM: handle_checksum(pw, custom_data) elif action_id == StarTrackerActionId.READ_SECONDARY_TM_SET: handle_read_secondary_tm_set(pw, custom_data) def handle_checksum(pw: PrintWrapper, custom_data: bytes): if len(custom_data) != 5: _LOGGER.warning( "Star tracker reply has invalid length {0}".format(len(custom_data)) ) return header_list = ["Checksum", "Checksum valid"] print(custom_data[4]) checksum_valid_flag = custom_data[4] >> 8 content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] pw.dlog(f"{header_list}") pw.dlog(f"{content_list}") def handle_read_secondary_tm_set(pw: PrintWrapper, custom_data: bytes): pw.dlog("Received secondary TM Sets") if len(custom_data) % 4 != 0: raise ValueError(f"Received data of unexpected length {len(custom_data)}") data_length = int(len(custom_data) / 4) fmt_str = "!" + "I" * data_length inc_len = struct.calcsize(fmt_str) set_ids = struct.unpack(fmt_str, custom_data[:inc_len]) pw.dlog("The following Datasets are currently Part of the secondary TM list") for set_id in set_ids: if set_id in SetId._value2member_map_: pw.dlog(SetId(set_id).name) else: pw.dlog(f"Unknown Set ID {set_id}") @tmtc_definitions_provider def add_str_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() oce.add(OpCode.ON_BOOTLOADER, "Mode On, Submode Bootloader") oce.add(OpCode.ON_FIRMWARE, "Mode On, Submode Firmware") oce.add(OpCode.NORMAL, "Mode Normal") oce.add(OpCode.OFF, "Mode Off") oce.add(OpCode.PING, "Star Tracker: Ping") oce.add(OpCode.TAKE_IMAGE, "Take Image") oce.add(OpCode.UPLOAD_IMAGE, Info.UPLOAD_IMAGE) oce.add(OpCode.DOWNLOAD_IMAGE, Info.DOWNLOAD_IMAGE) oce.add(OpCode.ONE_SHOOT_HK, Info.ONE_SHOOT_HK) oce.add(OpCode.ENABLE_HK, Info.ENABLE_HK) oce.add(OpCode.DISABLE_HK, Info.DISABLE_HK) oce.add(OpCode.SET_IMG_PROCESSOR_MODE, Info.SET_IMG_PROCESSOR_MODE) oce.add( OpCode.ADD_SECONDARY_TM_TO_NORMAL_MODE, Info.ADD_SECONDARY_TM_TO_NORMAL_MODE ) oce.add(OpCode.READ_SECONDARY_TM_SET, Info.READ_SECONDARY_TM_SET) oce.add(OpCode.RESET_SECONDARY_TM_SET, Info.RESET_SECONDARY_TM_SET) oce.add(OpCode.FW_UPDATE, Info.FW_UPDATE) oce.add(OpCode.SET_TIME_FROM_SYS_TIME, Info.SET_TIME_FROM_SYS_TIME) oce.add(OpCode.RELOAD_JSON_CFG_FILE, Info.RELOAD_JSON_CFG_FILE) defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce)