added bpx handling, reworked srv3/8 returnvalues

This commit is contained in:
Robin Müller 2022-02-03 15:03:02 +01:00
parent 8da50e2c3f
commit 1702d89576
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
10 changed files with 207 additions and 83 deletions

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="BPX Request HK" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s bpx -o hk -t 6 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="BPX Reset Reboot Counter" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s bpx -o rst_boot_cnt -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -17,6 +17,7 @@ class CustomServiceList(enum.Enum):
PDU2 = "pdu2"
ACU = "acu"
ACS = "acs"
BPX_BATTERY = "bpx"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater"

View File

@ -1,7 +1,7 @@
import argparse
from typing import Union, Dict, Tuple
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.config.definitions import ServiceOpCodeDictT, HkReplyUnpacked, DataReplyUnpacked
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
@ -79,9 +79,8 @@ class EiveHookObject(TmTcHookBase):
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
) -> DataReplyUnpacked:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@ -89,9 +88,8 @@ class EiveHookObject(TmTcHookBase):
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
) -> HkReplyUnpacked:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id,
set_id=set_id,

View File

@ -15,6 +15,7 @@ P60_DOCK_HANDLER = bytes([0x44, 0x25, 0x00, 0x00])
PDU_1_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x01])
PDU_2_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x02])
ACU_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x03])
BPX_HANDLER_ID = bytes([0x44, 0x26, 0x00, 0x00])
# Thermal Object IDs
HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])

31
pus_tc/bpx_batt.py Normal file
View File

@ -0,0 +1,31 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from config.object_ids import BPX_HANDLER_ID
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
class BpxSetIds:
GET_HK_SET = 0
GET_CFG_SET = 5
class BpxActionIds:
REBOOT = 2
RESET_COUNTERS = 3
SET_CFG = 4
GET_CFG = 5
def pack_bpx_commands(tc_queue: TcQueueT, op_code: str):
if op_code in ["0", "hk"]:
tc_queue.appendleft((QueueCommands.PRINT, "Requesting BPX battery HK set"))
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET)
cmd = generate_one_hk_command(sid=sid, ssc=0)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in ["1", "rst_boot_cnt"]:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting reboot counters"))
cmd = generate_action_command(
object_id=BPX_HANDLER_ID, action_id=BpxActionIds.RESET_COUNTERS
)
tc_queue.appendleft(cmd.pack_command_tuple())
pass

View File

@ -14,6 +14,7 @@ from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_commands
from pus_tc.pdu1 import pack_pdu1_commands
from pus_tc.bpx_batt import pack_bpx_commands
from pus_tc.acu import pack_acu_test_into
from pus_tc.imtq import pack_imtq_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
@ -94,6 +95,8 @@ def pack_service_queue_user(
return pack_acu_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.BPX_BATTERY.value:
return pack_bpx_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TMP1075_1.value:
object_id = TMP_1075_1_HANDLER_ID
return pack_tmp1075_test_into(

View File

@ -2,8 +2,8 @@
import struct
import os
import datetime
from typing import Tuple
from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
from pus_tc.syrlinks_hk_handler import SetIds
@ -13,6 +13,7 @@ from config.object_ids import (
IMTQ_HANDLER_ID,
GPS_HANDLER_0_ID,
GPS_HANDLER_1_ID,
BPX_HANDLER_ID
)
LOGGER = get_console_logger()
@ -20,7 +21,7 @@ LOGGER = get_console_logger()
def handle_user_hk_packet(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
) -> HkReplyUnpacked:
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id == SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
@ -29,7 +30,6 @@ def handle_user_hk_packet(
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
@ -37,21 +37,20 @@ def handle_user_hk_packet(
return handle_self_test_data(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
return handle_gps_data(hk_data=hk_data)
elif object_id == BPX_HANDLER_ID:
return handle_bpx_hk_data(hk_data=hk_data)
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
return HkReplyUnpacked()
def handle_syrlinks_rx_registers_dataset(
hk_data: bytearray,
) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = [
) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
@ -69,7 +68,7 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
hk_content = [
reply.content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
@ -79,28 +78,28 @@ def handle_syrlinks_rx_registers_dataset(
rx_demod_n0,
rx_data_rate,
]
return hk_header, hk_content, validity_buffer, 8
reply.validity_buffer = hk_data[22:]
reply.num_of_vars = 8
return reply
def handle_syrlinks_tx_registers_dataset(
hk_data: bytearray,
) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
hk_content = [tx_status, tx_waveform, tx_agc_value]
return hk_header, hk_content, validity_buffer, 3
reply.content_list = [tx_status, tx_waveform, tx_agc_value]
reply.validity_buffer = hk_data[4:]
reply.num_of_vars = 3
return reply
def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = [
def handle_self_test_data(hk_data: bytearray) -> HkReplyUnpacked:
reply = HkReplyUnpacked()
reply.hk_header = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
@ -186,7 +185,8 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
hk_content = [
reply.validity_buffer = hk_data[129:]
reply.content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
@ -227,43 +227,49 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
fina_coil_y_temperature,
fina_coil_z_temperature,
]
return hk_header, hk_content, validity_buffer, len(hk_header)
reply.num_of_vars = len(reply.hk_header)
return reply
def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
def handle_gps_data(hk_data: bytearray) -> HkReplyUnpacked:
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
var_index = 0
header_array = []
content_array = []
reply.header_list = [
"Latitude",
"Longitude",
"Altitude",
"Fix Mode",
"Sats in Use",
"Date",
"Unix Seconds"
]
latitude = struct.unpack("!d", hk_data[0:8])[0]
header_array.append("Latitude")
content_array.append(latitude)
longitude = struct.unpack("!d", hk_data[8:16])[0]
header_array.append("Longitude")
content_array.append(longitude)
altitude = struct.unpack("!d", hk_data[16:24])[0]
header_array.append("Altitude")
content_array.append(altitude)
fix_mode = hk_data[24]
header_array.append("Fix Mode")
content_array.append(fix_mode)
sat_in_use = hk_data[25]
header_array.append("Sats in Use")
content_array.append(sat_in_use)
year = struct.unpack("!H", hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
header_array.append("Date")
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
content_array.append(date_string)
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
header_array.append("Unix Seconds")
content_array.append(unix_seconds)
content_array = [
latitude,
longitude,
altitude,
fix_mode,
sat_in_use,
date_string,
unix_seconds
]
var_index += 13
reply.num_of_vars = var_index
if not os.path.isfile("gps_log.txt"):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
@ -275,5 +281,46 @@ def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
validity_buffer = hk_data[37:39]
return header_array, content_array, validity_buffer, var_index
reply.validity_buffer = hk_data[37:39]
return reply
def handle_bpx_hk_data(hk_data: bytes) -> HkReplyUnpacked:
LOGGER.info(f"Received BPX data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
charge_current = struct.unpack('!H', hk_data[0:2])[0]
discharge_current = struct.unpack('!H', hk_data[2:4])[0]
heater_current = struct.unpack('!H', hk_data[4:6])[0]
batt_voltage = struct.unpack('!H', hk_data[6:8])[0]
batt_temp_1 = struct.unpack('!h', hk_data[8:10])[0]
batt_temp_2 = struct.unpack('!h', hk_data[10:12])[0]
batt_temp_3 = struct.unpack('!h', hk_data[12:14])[0]
batt_temp_4 = struct.unpack('!h', hk_data[14:16])[0]
reboot_cntr = struct.unpack('!I', hk_data[16:20])[0]
boot_cause = hk_data[20]
reply.header_list = [
"Charge Current",
"Discharge Current",
"Heater Current",
"Battery Voltage",
"Batt Temp 1",
"Batt Temp 2",
"Batt Temp 3",
"Batt Temp 4",
"Reboot Counter",
"Boot Cause"
]
reply.content_list = [
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause
]
reply.validity_buffer = hk_data[21:]
return reply

View File

@ -1,18 +1,18 @@
import struct
from typing import Tuple
from config.object_ids import *
from pus_tc.imtq import ImtqActionIds
from pus_tc.ploc_mpsoc import PlocReplyIds
from pus_tc.ploc_supervisor import SupvActionIds
from pus_tc.star_tracker import StarTrackerActionIds
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked
LOGGER = get_console_logger()
def user_analyze_service_8_data(
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
) -> DataReplyUnpacked:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
@ -25,15 +25,16 @@ def user_analyze_service_8_data(
@return:
"""
if object_id == PDU_2_HANDLER_ID:
header_list = ["PDU2 Service 8 Reply"]
reply = DataReplyUnpacked()
reply.header_list = ["PDU2 Service 8 Reply"]
data_string = str()
for index in range(len(custom_data)):
data_string += str(hex(custom_data[index])) + " , "
data_string = data_string.rstrip()
data_string = data_string.rstrip(",")
data_string = data_string.rstrip()
content_list = [data_string]
reply.content_list = [data_string]
return reply
elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data)
elif object_id == PLOC_MPSOC_ID:
@ -42,56 +43,50 @@ def user_analyze_service_8_data(
return handle_supervisor_replies(action_id, custom_data)
elif object_id == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, custom_data)
else:
header_list = []
content_list = []
return header_list, content_list
return DataReplyUnpacked()
def handle_imtq_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]:
header_list = []
content_list = []
def handle_imtq_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == struct.unpack("!I", ImtqActionIds.get_commanded_dipole)[0]:
header_list = ["Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole"]
reply.header_list = ["Commanded X-Dipole", "Commanded Y-Dipole", "Commanded Z-Dipole"]
x_dipole = struct.unpack("!H", custom_data[:2])
y_dipole = struct.unpack("!H", custom_data[2:4])
z_dipole = struct.unpack("!H", custom_data[4:6])
content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
reply.content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
return reply
def handle_ploc_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]:
header_list = []
content_list = []
def handle_ploc_replies(action_id: int, custom_data: bytearray) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == PlocReplyIds.tm_mem_read_report:
header_list = ["PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data"]
content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]]
return header_list, content_list
reply.header_list = ["PLOC Memory Address", "PLOC Mem Len", "PLOC Read Memory Data"]
reply.content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]]
return reply
def handle_supervisor_replies(
action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
header_list = []
content_list = []
) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
header_list = ["MRAM Dump"]
content_list = [custom_data[: len(custom_data)]]
return header_list, content_list
reply.header_list = ["MRAM Dump"]
reply.content_list = [custom_data[: len(custom_data)]]
return reply
def handle_startracker_replies(
action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
header_list = []
content_list = []
) -> DataReplyUnpacked:
reply = DataReplyUnpacked()
if action_id == StarTrackerActionIds.CHECKSUM:
if len(custom_data) != 5:
LOGGER.warning(
"Star tracker reply has invalid length {0}".format(len(custom_data))
)
return header_list, content_list
header_list = ["Checksum", "Checksum valid"]
return reply
reply.header_list = ["Checksum", "Checksum valid"]
print(custom_data[4])
checksum_valid_flag = custom_data[4] >> 8
content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
return header_list, content_list
reply.content_list = ["0x" + custom_data[:4].hex(), checksum_valid_flag]
return reply

@ -1 +1 @@
Subproject commit 7b8b936f0d18fdbd375da92d43ecdd37d71ded57
Subproject commit 683ae401c7b4b2503bd91f23c3e069ced1b0db9c