diff --git a/.idea/runConfigurations/BPX_Request_HK.xml b/.idea/runConfigurations/BPX_Request_HK.xml new file mode 100644 index 0000000..9783034 --- /dev/null +++ b/.idea/runConfigurations/BPX_Request_HK.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/BPX_Reset_Reboot_Counter.xml b/.idea/runConfigurations/BPX_Reset_Reboot_Counter.xml new file mode 100644 index 0000000..6d0b32d --- /dev/null +++ b/.idea/runConfigurations/BPX_Reset_Reboot_Counter.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/config/definitions.py b/config/definitions.py index 990b4c1..3a104aa 100644 --- a/config/definitions.py +++ b/config/definitions.py @@ -17,6 +17,7 @@ class CustomServiceList(enum.Enum): PDU2 = "pdu2" ACU = "acu" ACS = "acs" + BPX_BATTERY = "bpx" TMP1075_1 = "tmp1075_1" TMP1075_2 = "tmp1075_2" HEATER = "heater" diff --git a/config/hook_implementations.py b/config/hook_implementations.py index c3c7663..872ea4e 100644 --- a/config/hook_implementations.py +++ b/config/hook_implementations.py @@ -1,7 +1,7 @@ import argparse from typing import Union, Dict, Tuple -from tmtccmd.config.definitions import ServiceOpCodeDictT +from tmtccmd.config.definitions import ServiceOpCodeDictT, HkReplyUnpacked, DataReplyUnpacked from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.tc.definitions import TcQueueT from tmtccmd.com_if.com_interface_base import CommunicationInterface @@ -79,9 +79,8 @@ class EiveHookObject(TmTcHookBase): @staticmethod def handle_service_8_telemetry( object_id: bytes, action_id: int, custom_data: bytearray - ) -> Tuple[list, list]: + ) -> DataReplyUnpacked: from pus_tm.service_8_hook import user_analyze_service_8_data - return user_analyze_service_8_data( object_id=object_id, action_id=action_id, custom_data=custom_data ) @@ -89,9 +88,8 @@ class EiveHookObject(TmTcHookBase): @staticmethod def handle_service_3_housekeeping( object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base - ) -> Tuple[list, list, bytearray, int]: + ) -> HkReplyUnpacked: from pus_tm.hk_handling import handle_user_hk_packet - return handle_user_hk_packet( object_id=object_id, set_id=set_id, diff --git a/config/object_ids.py b/config/object_ids.py index 387418a..2bbe62d 100644 --- a/config/object_ids.py +++ b/config/object_ids.py @@ -15,6 +15,7 @@ P60_DOCK_HANDLER = bytes([0x44, 0x25, 0x00, 0x00]) PDU_1_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x01]) PDU_2_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x02]) ACU_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x03]) +BPX_HANDLER_ID = bytes([0x44, 0x26, 0x00, 0x00]) # Thermal Object IDs HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4]) diff --git a/pus_tc/bpx_batt.py b/pus_tc/bpx_batt.py new file mode 100644 index 0000000..2a5a475 --- /dev/null +++ b/pus_tc/bpx_batt.py @@ -0,0 +1,31 @@ +from tmtccmd.tc.definitions import TcQueueT, QueueCommands +from config.object_ids import BPX_HANDLER_ID +from tmtccmd.tc.service_8_functional_cmd import generate_action_command +from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid + + +class BpxSetIds: + GET_HK_SET = 0 + GET_CFG_SET = 5 + + +class BpxActionIds: + REBOOT = 2 + RESET_COUNTERS = 3 + SET_CFG = 4 + GET_CFG = 5 + + +def pack_bpx_commands(tc_queue: TcQueueT, op_code: str): + if op_code in ["0", "hk"]: + tc_queue.appendleft((QueueCommands.PRINT, "Requesting BPX battery HK set")) + sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET) + cmd = generate_one_hk_command(sid=sid, ssc=0) + tc_queue.appendleft(cmd.pack_command_tuple()) + if op_code in ["1", "rst_boot_cnt"]: + tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counters")) + cmd = generate_action_command( + object_id=BPX_HANDLER_ID, action_id=BpxActionIds.RESET_COUNTERS + ) + tc_queue.appendleft(cmd.pack_command_tuple()) + pass diff --git a/pus_tc/tc_packer_hook.py b/pus_tc/tc_packer_hook.py index 853953c..0989deb 100644 --- a/pus_tc/tc_packer_hook.py +++ b/pus_tc/tc_packer_hook.py @@ -14,6 +14,7 @@ from pus_tc.service_200_mode import pack_service200_test_into from pus_tc.p60dock import pack_p60dock_test_into from pus_tc.pdu2 import pack_pdu2_commands from pus_tc.pdu1 import pack_pdu1_commands +from pus_tc.bpx_batt import pack_bpx_commands from pus_tc.acu import pack_acu_test_into from pus_tc.imtq import pack_imtq_test_into from pus_tc.tmp1075 import pack_tmp1075_test_into @@ -94,6 +95,8 @@ def pack_service_queue_user( return pack_acu_test_into( object_id=object_id, tc_queue=service_queue, op_code=op_code ) + if service == CustomServiceList.BPX_BATTERY.value: + return pack_bpx_commands(tc_queue=service_queue, op_code=op_code) if service == CustomServiceList.TMP1075_1.value: object_id = TMP_1075_1_HANDLER_ID return pack_tmp1075_test_into( diff --git a/pus_tm/hk_handling.py b/pus_tm/hk_handling.py index cb6896a..075de72 100644 --- a/pus_tm/hk_handling.py +++ b/pus_tm/hk_handling.py @@ -2,8 +2,8 @@ import struct import os import datetime -from typing import Tuple +from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.tm.service_3_housekeeping import Service3Base from tmtccmd.utility.logger import get_console_logger from pus_tc.syrlinks_hk_handler import SetIds @@ -13,6 +13,7 @@ from config.object_ids import ( IMTQ_HANDLER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID, + BPX_HANDLER_ID ) LOGGER = get_console_logger() @@ -20,7 +21,7 @@ LOGGER = get_console_logger() def handle_user_hk_packet( object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base -) -> Tuple[list, list, bytearray, int]: +) -> HkReplyUnpacked: """This function is called when a Service 3 Housekeeping packet is received.""" if object_id == SYRLINKS_HANDLER_ID: if set_id == SetIds.RX_REGISTERS_DATASET: @@ -29,7 +30,6 @@ def handle_user_hk_packet( return handle_syrlinks_tx_registers_dataset(hk_data) else: LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") - return [], [], bytearray(), 0 elif object_id == IMTQ_HANDLER_ID: if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and ( set_id <= ImtqSetIds.NEGATIVE_Z_TEST @@ -37,21 +37,20 @@ def handle_user_hk_packet( return handle_self_test_data(hk_data) else: LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") - return [], [], bytearray(), 0 elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID: return handle_gps_data(hk_data=hk_data) + elif object_id == BPX_HANDLER_ID: + return handle_bpx_hk_data(hk_data=hk_data) else: LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") - return [], [], bytearray(), 0 + return HkReplyUnpacked() def handle_syrlinks_rx_registers_dataset( hk_data: bytearray, -) -> Tuple[list, list, bytearray, int]: - hk_header = [] - hk_content = [] - validity_buffer = bytearray() - hk_header = [ +) -> HkReplyUnpacked: + reply = HkReplyUnpacked() + reply.header_list = [ "RX Status", "RX Sensitivity", "RX Frequency Shift", @@ -69,7 +68,7 @@ def handle_syrlinks_rx_registers_dataset( rx_demod_eb = struct.unpack("!I", hk_data[13:17]) rx_demod_n0 = struct.unpack("!I", hk_data[17:21]) rx_data_rate = hk_data[21] - hk_content = [ + reply.content_list = [ rx_status, rx_sensitivity, rx_frequency_shift, @@ -79,28 +78,28 @@ def handle_syrlinks_rx_registers_dataset( rx_demod_n0, rx_data_rate, ] - return hk_header, hk_content, validity_buffer, 8 + reply.validity_buffer = hk_data[22:] + reply.num_of_vars = 8 + return reply def handle_syrlinks_tx_registers_dataset( hk_data: bytearray, -) -> Tuple[list, list, bytearray, int]: - hk_header = [] - hk_content = [] - validity_buffer = bytearray() - hk_header = ["TX Status", "TX Waveform", "TX AGC value"] +) -> HkReplyUnpacked: + reply = HkReplyUnpacked() + reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"] tx_status = hk_data[0] tx_waveform = hk_data[1] tx_agc_value = struct.unpack("!H", hk_data[2:4]) - hk_content = [tx_status, tx_waveform, tx_agc_value] - return hk_header, hk_content, validity_buffer, 3 + reply.content_list = [tx_status, tx_waveform, tx_agc_value] + reply.validity_buffer = hk_data[4:] + reply.num_of_vars = 3 + return reply -def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: - hk_header = [] - hk_content = [] - validity_buffer = bytearray() - hk_header = [ +def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked: + reply = HkReplyUnpacked() + reply.hk_header = [ "Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", @@ -186,7 +185,8 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0] fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0] - hk_content = [ + reply.validity_buffer = hk_data[129:] + reply.content_list = [ init_err, init_raw_mag_x, init_raw_mag_y, @@ -227,43 +227,49 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in fina_coil_y_temperature, fina_coil_z_temperature, ] - - return hk_header, hk_content, validity_buffer, len(hk_header) + reply.num_of_vars = len(reply.hk_header) + return reply -def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: +def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked: LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}") + reply = HkReplyUnpacked() var_index = 0 header_array = [] content_array = [] + reply.header_list = [ + "Latitude", + "Longitude", + "Altitude", + "Fix Mode", + "Sats in Use", + "Date", + "Unix Seconds" + ] latitude = struct.unpack("!d", hk_data[0:8])[0] - header_array.append("Latitude") - content_array.append(latitude) longitude = struct.unpack("!d", hk_data[8:16])[0] - header_array.append("Longitude") - content_array.append(longitude) altitude = struct.unpack("!d", hk_data[16:24])[0] - header_array.append("Altitude") - content_array.append(altitude) fix_mode = hk_data[24] - header_array.append("Fix Mode") - content_array.append(fix_mode) sat_in_use = hk_data[25] - header_array.append("Sats in Use") - content_array.append(sat_in_use) year = struct.unpack("!H", hk_data[26:28])[0] month = hk_data[28] day = hk_data[29] hours = hk_data[30] minutes = hk_data[31] seconds = hk_data[32] - header_array.append("Date") date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}" - content_array.append(date_string) unix_seconds = struct.unpack("!I", hk_data[33:37])[0] - header_array.append("Unix Seconds") - content_array.append(unix_seconds) + content_array = [ + latitude, + longitude, + altitude, + fix_mode, + sat_in_use, + date_string, + unix_seconds + ] var_index += 13 + reply.num_of_vars = var_index if not os.path.isfile("gps_log.txt"): with open("gps_log.txt", "w") as gps_file: gps_file.write( @@ -275,5 +281,46 @@ def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]: f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, " f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n" ) - validity_buffer = hk_data[37:39] - return header_array, content_array, validity_buffer, var_index + reply.validity_buffer = hk_data[37:39] + return reply + + +def handle_bpx_hk_data(hk_data: bytes) -> HkReplyUnpacked: + LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}") + reply = HkReplyUnpacked() + charge_current = struct.unpack('!H', hk_data[0:2])[0] + discharge_current = struct.unpack('!H', hk_data[2:4])[0] + heater_current = struct.unpack('!H', hk_data[4:6])[0] + batt_voltage = struct.unpack('!H', hk_data[6:8])[0] + batt_temp_1 = struct.unpack('!h', hk_data[8:10])[0] + batt_temp_2 = struct.unpack('!h', hk_data[10:12])[0] + batt_temp_3 = struct.unpack('!h', hk_data[12:14])[0] + batt_temp_4 = struct.unpack('!h', hk_data[14:16])[0] + reboot_cntr = struct.unpack('!I', hk_data[16:20])[0] + boot_cause = hk_data[20] + reply.header_list = [ + "Charge Current", + "Discharge Current", + "Heater Current", + "Battery Voltage", + "Batt Temp 1", + "Batt Temp 2", + "Batt Temp 3", + "Batt Temp 4", + "Reboot Counter", + "Boot Cause" + ] + reply.content_list = [ + charge_current, + discharge_current, + heater_current, + batt_voltage, + batt_temp_1, + batt_temp_2, + batt_temp_3, + batt_temp_4, + reboot_cntr, + boot_cause + ] + reply.validity_buffer = hk_data[21:] + return reply diff --git a/pus_tm/service_8_hook.py b/pus_tm/service_8_hook.py index f1910b6..b157092 100644 --- a/pus_tm/service_8_hook.py +++ b/pus_tm/service_8_hook.py @@ -1,18 +1,18 @@ import struct -from typing import Tuple from config.object_ids import * from pus_tc.imtq import ImtqActionIds from pus_tc.ploc_mpsoc import PlocReplyIds from pus_tc.ploc_supervisor import SupvActionIds from pus_tc.star_tracker import StarTrackerActionIds from tmtccmd.utility.logger import get_console_logger +from tmtccmd.config.definitions import DataReplyUnpacked LOGGER = get_console_logger() def user_analyze_service_8_data( object_id: bytes, action_id: int, custom_data: bytearray -) -> Tuple[list, list]: +) -> DataReplyUnpacked: """ This function is called by the TMTC core if a Service 8 data reply (subservice 130) is received. The user can return a tuple of two lists, where the first list @@ -25,15 +25,16 @@ def user_analyze_service_8_data( @return: """ if object_id == PDU_2_HANDLER_ID: - header_list = ["PDU2 Service 8 Reply"] - + reply = DataReplyUnpacked() + reply.header_list = ["PDU2 Service 8 Reply"] data_string = str() for index in range(len(custom_data)): data_string += str(hex(custom_data[index])) + " , " data_string = data_string.rstrip() data_string = data_string.rstrip(",") data_string = data_string.rstrip() - content_list = [data_string] + reply.content_list = [data_string] + return reply elif object_id == IMTQ_HANDLER_ID: return handle_imtq_replies(action_id, custom_data) elif object_id == PLOC_MPSOC_ID: @@ -42,56 +43,50 @@ def user_analyze_service_8_data( return handle_supervisor_replies(action_id, custom_data) elif object_id == STAR_TRACKER_ID: return handle_startracker_replies(action_id, custom_data) - else: - header_list = [] - content_list = [] - return header_list, content_list + return DataReplyUnpacked() -def handle_imtq_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]: - header_list = [] - content_list = [] +def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked: + reply = DataReplyUnpacked() if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]: - header_list = ["Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole"] + reply.header_list = ["Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole"] x_dipole = struct.unpack("!H", custom_data[:2]) y_dipole = struct.unpack("!H", custom_data[2:4]) z_dipole = struct.unpack("!H", custom_data[4:6]) - content_list = [x_dipole[0], y_dipole[0], z_dipole[0]] + reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]] + return reply -def handle_ploc_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]: - header_list = [] - content_list = [] +def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked: + reply = DataReplyUnpacked() if action_id == PlocReplyIds.tm_mem_read_report: - header_list = ["PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data"] - content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]] - return header_list, content_list + reply.header_list = ["PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data"] + reply.content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]] + return reply def handle_supervisor_replies( action_id: int, custom_data: bytearray -) -> Tuple[list, list]: - header_list = [] - content_list = [] +) -> DataReplyUnpacked: + reply = DataReplyUnpacked() if action_id == SupvActionIds.DUMP_MRAM: - header_list = ["MRAM Dump"] - content_list = [custom_data[: len(custom_data)]] - return header_list, content_list + reply.header_list = ["MRAM Dump"] + reply.content_list = [custom_data[: len(custom_data)]] + return reply def handle_startracker_replies( action_id: int, custom_data: bytearray -) -> Tuple[list, list]: - header_list = [] - content_list = [] +) -> DataReplyUnpacked: + reply = DataReplyUnpacked() if action_id == StarTrackerActionIds.CHECKSUM: if len(custom_data) != 5: LOGGER.warning( "Star tracker reply has invalid length {0}".format(len(custom_data)) ) - return header_list, content_list - header_list = ["Checksum", "Checksum valid"] + return reply + reply.header_list = ["Checksum", "Checksum valid"] print(custom_data[4]) checksum_valid_flag = custom_data[4] >> 8 - content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] - return header_list, content_list + reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag] + return reply diff --git a/tmtccmd b/tmtccmd index 7b8b936..683ae40 160000 --- a/tmtccmd +++ b/tmtccmd @@ -1 +1 @@ -Subproject commit 7b8b936f0d18fdbd375da92d43ecdd37d71ded57 +Subproject commit 683ae401c7b4b2503bd91f23c3e069ced1b0db9c