Merge remote-tracking branch 'origin/develop' into mohr/looooop

This commit is contained in:
Ulrich Mohr 2022-05-22 22:02:44 +02:00
commit 1f7faef083
26 changed files with 1084 additions and 693 deletions

24
.run/tmtcloop.run.xml Normal file
View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcloop" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcloop.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -47,4 +47,6 @@ class CustomServiceList(enum.Enum):
SUS_ASS = "sus-ass"
TCS_ASS = "tcs-ass"
TIME = "time"
PROCEDURE = "proc"
TVTTESTPROCEDURE = "tvtestproc"
CONTROLLERS = "controllers"

View File

@ -36,14 +36,14 @@ TMP_1075_2_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x05])
SYRLINKS_HANDLER_ID = bytes([0x44, 0x53, 0x00, 0xA3])
# ACS Object IDs
MGM_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x06])
MGM_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x07])
MGM_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x08])
MGM_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x09])
GYRO_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x10])
GYRO_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11])
GYRO_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12])
GYRO_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13])
MGM_0_LIS3_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x06])
MGM_1_RM3100_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x07])
MGM_2_LIS3_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x08])
MGM_3_RM3100_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x09])
GYRO_0_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x10])
GYRO_1_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11])
GYRO_2_ADIS_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12])
GYRO_3_L3G_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13])
GPS_HANDLER_0_ID = bytes([0x44, 0x13, 0x00, 0x45])
GPS_HANDLER_1_ID = bytes([0x44, 0x13, 0x01, 0x46])
RW1_ID = bytes([0x44, 0x12, 0x00, 0x47])

View File

@ -34,6 +34,7 @@ def get_eive_service_op_code_dict() -> ServiceOpCodeDictT:
add_pdec_cmds(cmd_dict=service_op_code_dict)
add_heater_cmds(cmd_dict=service_op_code_dict)
add_tmp_sens_cmds(cmd_dict=service_op_code_dict)
add_proc_cmds(cmd_dict=service_op_code_dict)
return service_op_code_dict
@ -946,6 +947,37 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
] = service_ploc_memory_dumper_tuple
def add_proc_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.system.proc import OpCodes, KAI
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.HEATER, info=KAI.HEATER[1]
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.PROCEDURE.value,
info="Procedures",
op_code_entry=op_code_dict,
)
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.BAT_FT, info=KAI.BAT_FT[1]
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.PCDU_FT, info=KAI.PCDU_FT[1]
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.CORE_FT, info=KAI.CORE_FT[1], options={}
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.PROCEDURE.value,
info="TV Test Procedures",
op_code_entry=op_code_dict,
)
def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
import pus_tc.system.tcs as tcs

10
pus_tc/devs/gyros.py Normal file
View File

@ -0,0 +1,10 @@
import enum
class AdisGyroSetIds(enum.IntEnum):
CORE_HK = 0
CFG_HK = 1
class L3gGyroSetIds(enum.IntEnum):
CORE_HK = 0

9
pus_tc/devs/mgms.py Normal file
View File

@ -0,0 +1,9 @@
import enum
class MgmLis3SetIds(enum.IntEnum):
CORE_HK = 0
class MgmRm3100SetIds(enum.IntEnum):
CORE_HK = 0

View File

@ -16,9 +16,6 @@ from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER
HK_SET_ID = 0x3
class P60OpCodes:
STACK_3V3_ON = ["stack-3v3-on", "1"]
STACK_3V3_OFF = ["stack-3v3-off", "2"]

View File

@ -5,6 +5,7 @@ from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
from .common import command_mode
class AcsOpCodes:
ACS_ASS_A_SIDE = ["0", "acs-a"]
ACS_ASS_B_SIDE = ["1", "acs-b"]

View File

@ -18,10 +18,12 @@ class Info:
def pack_controller_commands(tc_queue: TcQueueT, op_code: str):
parameters = prompt_parameters([{"name": "Mode", "defaultValue": "2"}, {
"name": "Submode", "defaultValue": "0"}])
mode = int(parameters["Mode"])
if mode < 0 or mode > 2:
print("Invalid Mode, defaulting to OFF")
mode = 0
submode = int(parameters["Submode"])
command_mode(
object_id=get_object_from_op_code(op_code),

120
pus_tc/system/proc.py Normal file
View File

@ -0,0 +1,120 @@
from tmtccmd.config import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.pus_3_fsfw_hk import *
from config.object_ids import (
BPX_HANDLER_ID,
P60_DOCK_HANDLER,
PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
ACU_HANDLER_ID,
CORE_CONTROLLER_ID,
)
import config.object_ids as oids
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.system.core import SetIds as CoreSetIds
from gomspace.gomspace_common import SetIds as GsSetIds
class OpCodes:
HEATER = ["0", "heater"]
BAT_FT = ["bat-ft"]
CORE_FT = ["core-ft"]
PCDU_FT = ["pcdu-ft"]
class KeyAndInfo:
HEATER = ["Heater", "heater procedure"]
BAT_FT = ["BPX Battery", "battery functional test"]
PCDU_FT = ["PCDU", "PCDU functional test"]
CORE_FT = ["OBC", "OBC functional test"]
KAI = KeyAndInfo
PROC_INFO_DICT = {
KAI.BAT_FT[0]: [OpCodes.BAT_FT, KAI.BAT_FT[1], 120.0, 10.0],
KAI.PCDU_FT[0]: [OpCodes.PCDU_FT, KeyAndInfo.PCDU_FT[1], 120.0, 10.0],
KAI.CORE_FT[0]: [OpCodes.CORE_FT, KeyAndInfo.CORE_FT[1], 120.0, 10.0],
}
def generic_print(tc_queue: TcQueueT, key: str):
info = PROC_INFO_DICT[key]
tc_queue.appendleft(
(QueueCommands.PRINT, f"Executing {info[0]} Procedure (OpCodes: {info[1]})")
)
def pack_generic_hk_listening_cmds(
tc_queue: TcQueueT, proc_key: str, sid: bytes, diag: bool
):
generic_print(tc_queue=tc_queue, key=proc_key)
listen_to_hk_for_x_seconds(
diag=diag,
tc_queue=tc_queue,
device=proc_key,
sid=sid,
interval_seconds=10.0,
collection_time=120.0,
)
def pack_proc_commands(tc_queue: TcQueueT, op_code: str):
if op_code in OpCodes.BAT_FT:
key = KAI.BAT_FT[0]
sid = make_sid(BPX_HANDLER_ID, BpxSetIds.GET_HK_SET)
pack_generic_hk_listening_cmds(
tc_queue=tc_queue, proc_key=key, sid=sid, diag=False
)
if op_code in OpCodes.PCDU_FT:
key = KAI.PCDU_FT[0]
sid = make_sid(P60_DOCK_HANDLER, GsSetIds.P60_CORE)
pack_generic_hk_listening_cmds(
tc_queue=tc_queue, proc_key=key, sid=sid, diag=False
)
if op_code in OpCodes.CORE_FT:
key = KAI.CORE_FT[0]
sid = make_sid(oids.CORE_CONTROLLER_ID, CoreSetIds.HK)
pack_generic_hk_listening_cmds(
tc_queue=tc_queue, proc_key=key, sid=sid, diag=False
)
def listen_to_hk_for_x_seconds(
tc_queue: TcQueueT,
diag: bool,
device: str,
sid: bytes,
interval_seconds: float,
collection_time: float,
):
"""
enable_periodic_hk_command_with_interval(
diag: bool, sid: bytes, interval_seconds: float, ssc: int
"""
"""
function with periodic HK generation
interval_seconds = at which rate HK is saved
collection_time = how long the HK is saved for
device = for which device the HK is saved
functional_test =
diagnostic Hk = yes diagnostic or no diagnostic
sid = structural ID for specific device
device Hk set ID
"""
tc_queue.appendleft((QueueCommands.PRINT, f"Enabling periodic HK for {device}"))
cmd_tuple = enable_periodic_hk_command_with_interval(
diag=diag, sid=sid, interval_seconds=interval_seconds, ssc=0
)
for cmd in cmd_tuple:
tc_queue.appendleft(cmd.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, collection_time))
tc_queue.appendleft((QueueCommands.PRINT, f"Disabling periodic HK for {device}"))
tc_queue.appendleft(
disable_periodic_hk_command(diag=diag, sid=sid, ssc=0).pack_command_tuple()
)

View File

@ -40,6 +40,7 @@ from pus_tc.system.acs import pack_acs_command, pack_sus_cmds
from pus_tc.devs.plpcdu import pack_pl_pcdu_commands
from pus_tc.devs.str_img_helper import pack_str_img_helper_command
from pus_tc.system.tcs import pack_tcs_sys_commands
from pus_tc.system.proc import pack_proc_commands
from pus_tc.system.controllers import pack_controller_commands
from config.definitions import CustomServiceList
from config.object_ids import (
@ -221,6 +222,8 @@ def pack_service_queue_user(
return pack_solar_array_deployment_test_into(
object_id=SOLAR_ARRAY_DEPLOYMENT_ID, tc_queue=service_queue
)
if service == CustomServiceList.PROCEDURE.value:
return pack_proc_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.SUS_ASS.value:
return pack_sus_cmds(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PL_PCDU.value:

66
pus_tm/devs/bpx_bat.py Normal file
View File

@ -0,0 +1,66 @@
import struct
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tm.defs import PrintWrapper
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer)
if set_id == BpxSetIds.GET_HK_SET:
fmt_str = "!HHHHhhhhIB"
inc_len = struct.calcsize(fmt_str)
(
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
) = struct.unpack(fmt_str, hk_data[0:inc_len])
header_list = [
"Charge Current",
"Discharge Current",
"Heater Current",
"Battery Voltage",
"Batt Temp 1",
"Batt Temp 2",
"Batt Temp 3",
"Batt Temp 4",
"Reboot Counter",
"Boot Cause",
]
content_list = [
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
]
validity_buffer = hk_data[inc_len:]
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
elif set_id == BpxSetIds.GET_CFG_SET:
battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
header_list = [
"Battery Heater Mode",
"Battery Heater Low Limit",
"Battery Heater High Limit",
]
content_list = [battheat_mode, battheat_low, battheat_high]
validity_buffer = hk_data[3:]
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)

59
pus_tm/devs/gps.py Normal file
View File

@ -0,0 +1,59 @@
import os
import struct
from datetime import datetime
from pus_tm.defs import PrintWrapper
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
pw = PrintWrapper(printer)
pw.dlog(f"Received GPS data, HK data length {len(hk_data)}")
var_index = 0
header_list = [
"Latitude",
"Longitude",
"Altitude",
"Fix Mode",
"Sats in Use",
"Date",
"Unix Seconds",
]
latitude = struct.unpack("!d", hk_data[0:8])[0]
longitude = struct.unpack("!d", hk_data[8:16])[0]
altitude = struct.unpack("!d", hk_data[16:24])[0]
fix_mode = hk_data[24]
sat_in_use = hk_data[25]
year = struct.unpack("!H", hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
content_list = [
latitude,
longitude,
altitude,
fix_mode,
sat_in_use,
date_string,
unix_seconds,
]
var_index += 13
if not os.path.isfile("gps_log.txt"):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
"Date, Unix Seconds\n"
)
with open("gps_log.txt", "a") as gps_file:
gps_file.write(
f"{datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
validity_buffer = hk_data[37:39]
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)

75
pus_tm/devs/gyros.py Normal file
View File

@ -0,0 +1,75 @@
import struct
from pus_tm.defs import PrintWrapper
from tmtccmd.utility import ObjectId
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from pus_tc.devs.gyros import L3gGyroSetIds, AdisGyroSetIds
import config.object_ids as obj_ids
def handle_gyros_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if object_id.as_bytes in [
obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_2_ADIS_HANDLER_ID,
]:
handle_adis_gyro_hk(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
elif object_id.as_bytes in [
obj_ids.GYRO_1_L3G_HANDLER_ID,
obj_ids.GYRO_3_L3G_HANDLER_ID,
]:
handle_l3g_gyro_hk(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
def handle_adis_gyro_hk(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == AdisGyroSetIds.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ddddddf"
inc_len = struct.calcsize(fmt_str)
(angVelocX, angVelocY, angVelocZ, accelX, accelY, accelZ, temp) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
pw.dlog(f"Received ADIS1650X Gyro HK data from object {object_id}")
pw.dlog(
f"Angular Velocities (degrees per second): X {angVelocX} | "
f"Y {angVelocY} | Z {angVelocZ}"
)
pw.dlog(f"Acceleration (m/s^2): X {accelX} | Y {accelY} | Z {accelZ}")
pw.dlog(f"Temperature {temp} C")
if set_id == AdisGyroSetIds.CFG_HK:
pw = PrintWrapper(printer)
fmt_str = "!HBHH"
inc_len = struct.calcsize(fmt_str)
(diag_stat_reg, filter_setting, msc_ctrl_reg, dec_rate_reg) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
pw.dlog(f"Diagnostic Status Register {diag_stat_reg:#018b}")
pw.dlog(f"Filter Settings {filter_setting:#010b}")
pw.dlog(f"Miscellaneous Control Register {msc_ctrl_reg:#018b}")
pw.dlog(f"Decimation Rate {dec_rate_reg:#06x}")
def handle_l3g_gyro_hk(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == L3gGyroSetIds:
pw = PrintWrapper(printer)
fmt_str = "!ffff"
inc_len = struct.calcsize(fmt_str)
(angVelocX, angVelocY, angVelocZ, temp) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
pw.dlog(f"Received L3GD20H Gyro HK data from object {object_id}")
pw.dlog(
f"Angular Velocities (degrees per second): X {angVelocX} | "
f"Y {angVelocY} | Z {angVelocZ}"
)
pw.dlog(f"Temperature {temp} C")

140
pus_tm/devs/imtq_mgt.py Normal file
View File

@ -0,0 +1,140 @@
import struct
from pus_tm.defs import PrintWrapper
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
"Init Raw Mag Z [nT]",
"Init Cal Mag X [nT]",
"Init Cal Mag Y [nT]",
"Init Cal Mag Z [nT]",
"Init Coil X Current [mA]",
"Init Coil Y Current [mA]",
"Init Coil Z Current [mA]",
"Init Coil X Temperature [°C]",
"Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]",
"Err",
"Raw Mag X [nT]",
"Raw Mag Y [nT]",
"Raw Mag Z [nT]",
"Cal Mag X [nT]",
"Cal Mag Y [nT]",
"Cal Mag Z [nT]",
"Coil X Current [mA]",
"Coil Y Current [mA]",
"Coil Z Current [mA]",
"Coil X Temperature [°C]",
"Coil Y Temperature [°C]",
"Coil Z Temperature [°C]",
"Fina Err",
"Fina Raw Mag X [nT]",
"Fina Raw Mag Y [nT]",
"Fina Raw Mag Z [nT]",
"Fina Cal Mag X [nT]",
"Fina Cal Mag Y [nT]",
"Fina Cal Mag Z [nT]",
"Fina Coil X Current [mA]",
"Fina Coil Y Current [mA]",
"Fina Coil Z Current [mA]",
"Fina Coil X Temperature [°C]",
"Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]",
]
# INIT step (no coil actuation)
init_err = hk_data[0]
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
# Actuation step
err = hk_data[43]
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
# FINA step (no coil actuation)
fina_err = hk_data[86]
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
validity_buffer = hk_data[129:]
content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
init_raw_mag_z,
init_cal_mag_x,
init_cal_mag_y,
init_cal_mag_z,
init_coil_x_current,
init_coil_y_current,
init_coil_z_current,
init_coil_x_temperature,
init_coil_y_temperature,
init_coil_z_temperature,
err,
raw_mag_x,
init_raw_mag_y,
raw_mag_z,
cal_mag_x,
cal_mag_y,
cal_mag_z,
coil_x_current,
coil_y_current,
coil_z_current,
coil_x_temperature,
coil_y_temperature,
coil_z_temperature,
fina_err,
fina_raw_mag_x,
fina_raw_mag_y,
fina_raw_mag_z,
fina_cal_mag_x,
fina_cal_mag_y,
fina_cal_mag_z,
fina_coil_x_current,
fina_coil_y_current,
fina_coil_z_current,
fina_coil_x_temperature,
fina_coil_y_temperature,
fina_coil_z_temperature,
]
num_of_vars = len(header_list)
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)

46
pus_tm/devs/mgms.py Normal file
View File

@ -0,0 +1,46 @@
import struct
from pus_tm.defs import PrintWrapper
from pus_tc.devs.mgms import MgmRm3100SetIds, MgmLis3SetIds
from tmtccmd.utility import ObjectId
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
import config.object_ids as obj_ids
def handle_mgm_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if object_id in [obj_ids.MGM_0_LIS3_HANDLER_ID, obj_ids.MGM_2_LIS3_HANDLER_ID]:
handle_mgm_lis3_hk_data(object_id, printer, set_id, hk_data)
pass
def handle_mgm_lis3_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == MgmLis3SetIds.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = "!ffff"
inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z, temp) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
pw.dlog(f"Received MGM LIS3 from object {object_id}")
pw.dlog(
f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}"
)
pw.dlog(f"Temperature {temp} C")
def handle_mgm_rm3100_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == MgmRm3100SetIds.CORE_HK:
pw = PrintWrapper(printer)
fmt_str = f"!fff"
inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])
pw.dlog(f"Received MGM LIS3 from object {object_id}")
pw.dlog(
f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}"
)

294
pus_tm/devs/pcdu.py Normal file
View File

@ -0,0 +1,294 @@
import struct
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from pus_tm.defs import PrintWrapper
from gomspace.gomspace_common import SetIds
P60_INDEX_LIST = [
"ACU VCC",
"PDU1 VCC",
"X3 IDLE VCC",
"PDU2 VCC",
"ACU VBAT",
"PDU1 VBAT",
"X3 IDLE VBAT",
"PDU2 VBAT",
"STACK VBAT",
"STACK 3V3",
"STACK 5V",
"GS3V3",
"GS5V",
]
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
PDU1_CHANNELS_NAMES = [
"TCS Board",
"Syrlinks",
"Startracker",
"MGT",
"SUS Nominal",
"SCEX",
"PLOC",
"ACS A Side",
"Unused Channel 8",
]
PDU2_CHANNELS_NAMES = [
"Q7S",
"Payload PCDU CH1",
"RW",
"TCS Heater In",
"SUS Redundant",
"Deployment Mechanism",
"Payload PCDU CH6",
"ACS B Side",
"Payload Camera",
]
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
class WdtInfo:
def __init__(self, pw: PrintWrapper):
self.wdt_reboots_list = []
self.time_pings_left_list = []
self.pw = pw
def print(self):
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
self.pw.dlog(wdt_info)
for idx in range(len(self.wdt_reboots_list)):
self.pw.dlog(
f"{WDT_LIST[idx].ljust(5)} | "
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
)
def parse(self, wdt_data: bytes, current_idx: int) -> int:
priv_idx = 0
self.wdt_reboots_list = []
self.time_pings_left_list = []
for idx in range(5):
self.wdt_reboots_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(3):
self.time_pings_left_list.append(
struct.unpack("!I", wdt_data[priv_idx : priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(2):
self.time_pings_left_list.append(wdt_data[priv_idx])
current_idx += 1
priv_idx += 1
return current_idx
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
pw = PrintWrapper(printer=printer)
current_idx = 0
priv_idx = pdu_idx - 1
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
vcc,
vbat,
conv_enb_0,
conv_enb_1,
conv_enb_2,
boot_cause,
uptime,
reset_cause,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
pw.dlog(f"VCC {vcc} mV | VBAT {vbat} mV")
pw.dlog(f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]")
pw.dlog(
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
)
current_idx += inc_len
latchup_list = []
pw.dlog("Latchups")
for idx in range(len(PDU1_CHANNELS_NAMES)):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
)
pw.dlog(content_line)
current_idx += 2
device_types = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_types.append(hk_data[current_idx])
current_idx += 1
device_statuses = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
device_statuses.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo(pw=pw)
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print()
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
pw.dlog(f"Received PDU HK from PDU {pdu_idx}")
current_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
output_enb_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)):
output_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(len(PDU1_CHANNELS_NAMES)):
out_enb = f"{output_enb_list[idx]}".ljust(6)
content_line = (
f"{PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
pw.dlog(content_line)
fmt_str = "!IBh"
inc_len = struct.calcsize(fmt_str)
(boot_count, batt_mode, temperature) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
info = (
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
f"Temperature {temperature / 10.0}"
)
pw.dlog(info)
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer=printer)
if set_id == SetIds.P60_CORE:
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")
current_idx = 0
current_list = []
for idx in range(13):
current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(13):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
current_idx += 2
out_enb_list = []
for idx in range(13):
out_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(13):
out_enb = f"{out_enb_list[idx]}".ljust(6)
content_line = (
f"{P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
pw.dlog(content_line)
fmt_str = "!IBhHhh"
inc_len = struct.calcsize(fmt_str)
(
boot_count,
batt_mode,
batt_current,
batt_voltage,
temp_0,
temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
batt_info = (
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
f"Charge current {batt_current} | Voltage {batt_voltage}"
)
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
pw.dlog(temps)
pw.dlog(batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
if set_id == SetIds.P60_AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0
latchup_list = []
pw.dlog("P60 Dock Latchups")
for idx in range(0, 13):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0]
)
content_line = f"{P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
pw.dlog(content_line)
current_idx += 2
fmt_str = "!IIHBBHHhhB"
inc_len = struct.calcsize(fmt_str)
(
boot_cause,
uptime,
reset_cause,
heater_on,
conv_5v_on,
dock_vbat,
dock_vcc_c,
batt_temp_0,
batt_temp_1,
dearm_status,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
wdt = WdtInfo(pw=pw)
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
fmt_str = "!hhbb"
inc_len = struct.calcsize(fmt_str)
(
batt_charge_current,
batt_discharge_current,
ant6_depl,
ar6_depl,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len
device_types = []
device_statuses = []
for idx in range(8):
device_types.append(hk_data[current_idx])
current_idx += 1
for idx in range(8):
device_statuses.append(hk_data[current_idx])
current_idx += 1
util_info = (
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
)
util_info_2 = (
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
)
pw.dlog(util_info)
pw.dlog(util_info_2)
wdt.print()
misc_info = (
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
)
pw.dlog(misc_info)
batt_info = (
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
)
pw.dlog(batt_info)
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
)

67
pus_tm/devs/syrlinks.py Normal file
View File

@ -0,0 +1,67 @@
import struct
from pus_tm.defs import PrintWrapper
from pus_tc.devs.syrlinks_hk_handler import SetIds
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
else:
pw = PrintWrapper(printer)
pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}")
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
pw = PrintWrapper(printer)
header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
"RX IQ Power",
"RX AGC Value",
"RX Demod Eb",
"RX Demod N0",
"RX Datarate",
]
rx_status = hk_data[0]
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
rx_iq_power = struct.unpack("!H", hk_data[9:11])
rx_agc_value = struct.unpack("!H", hk_data[11:13])
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
rx_iq_power,
rx_agc_value,
rx_demod_eb,
rx_demod_n0,
rx_data_rate,
]
validity_buffer = hk_data[22:]
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
def handle_syrlinks_tx_registers_dataset(
printer: FsfwTmTcPrinter,
hk_data: bytes,
):
pw = PrintWrapper(printer)
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
content_list = [tx_status, tx_waveform, tx_agc_value]
validity_buffer = hk_data[4:]
pw.dlog(str(header_list))
pw.dlog(str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)

View File

@ -33,7 +33,6 @@ def ccsds_tm_handler(apid: int, raw_tm_packet: bytes, _user_args: any) -> None:
pus_factory_hook(raw_tm_packet=raw_tm_packet)
def pus_factory_hook(raw_tm_packet: bytes):
if len(raw_tm_packet) < 8:
LOGGER.warning("Detected packet shorter than 8 bytes!")
@ -81,6 +80,5 @@ def pus_factory_hook(raw_tm_packet: bytes):
packet=raw_tm_packet, srv_subservice=(service_type, subservice_type)
)
except ValueError:
# TODO: Log faulty packet
LOGGER.warning("Invalid packet format detected")
log_raw_unknown_packet(packet=raw_tm_packet, packet_type=PacketTypes.TM)

View File

@ -1,35 +1,32 @@
"""HK Handling for EIVE OBSW"""
import struct
import os
import datetime
from pus_tm.system.tcs import handle_thermal_controller_hk_data, TM_TCP_SERVER
from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.pus_3_fsfw_hk import (
Service3Base,
HkContentType,
Service3FsfwTm,
)
from tmtccmd.logging import get_console_logger
from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.devs.syrlinks_hk_handler import SetIds
from pus_tc.devs.p60dock import SetIds
from pus_tc.devs.imtq import ImtqSetIds
from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT
import config.object_ids as obj_ids
from tmtccmd.logging import get_console_logger
from pus_tm.devs.bpx_bat import handle_bpx_hk_data
from pus_tm.devs.gps import handle_gps_data
from pus_tm.devs.gyros import handle_gyros_hk_data
from pus_tm.devs.imtq_mgt import handle_self_test_data
from pus_tm.devs.pcdu import handle_pdu_data, handle_p60_hk_data
from pus_tm.devs.syrlinks import handle_syrlinks_hk_data
from pus_tc.devs.imtq import ImtqSetIds
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
from pus_tm.defs import FsfwTmTcPrinter, log_to_both
from pus_tm.tm_tcp_server import TmTcpServer
from pus_tm.system.core import handle_core_hk_data
from pus_tm.devs.mgms import handle_mgm_hk_data
import config.object_ids as obj_ids
LOGGER = get_console_logger()
TM_TCP_SERVER = TmTcpServer.getInstance()
def handle_hk_packet(
raw_tm: bytes,
@ -42,9 +39,9 @@ def handle_hk_packet(
named_obj_id = tm_packet.object_id
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:]
TM_TCP_SERVER.report_raw_hk_data(object_id=named_obj_id,
set_id=tm_packet.set_id,
hk_data=hk_data)
TM_TCP_SERVER.report_raw_hk_data(
object_id=named_obj_id, set_id=tm_packet.set_id, hk_data=hk_data
)
printer.generic_hk_tm_print(
content_type=HkContentType.HK,
object_id=named_obj_id,
@ -60,6 +57,7 @@ def handle_hk_packet(
if tm_packet.subservice == 10 or tm_packet.subservice == 12:
LOGGER.warning("HK definitions printout not implemented yet")
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectId,
@ -72,12 +70,7 @@ def handle_regular_hk_print(
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
handle_rw_hk_data(printer, object_id, set_id, hk_data)
if objb == obj_ids.SYRLINKS_HANDLER_ID:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
else:
LOGGER.info("Service 3 TM: Syrlinks handler reply with unknown set id")
handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
if objb == obj_ids.IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (
set_id <= ImtqSetIds.NEGATIVE_Z_TEST
@ -90,7 +83,7 @@ def handle_regular_hk_print(
if objb == obj_ids.BPX_HANDLER_ID:
handle_bpx_hk_data(hk_data=hk_data, set_id=set_id, printer=printer)
if objb == obj_ids.CORE_CONTROLLER_ID:
return handle_core_hk_data(printer=printer, hk_data=hk_data)
return handle_core_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)
if objb == obj_ids.PDU_1_HANDLER_ID:
return handle_pdu_data(
printer=printer, pdu_idx=1, set_id=set_id, hk_data=hk_data
@ -105,635 +98,30 @@ def handle_regular_hk_print(
)
if objb == obj_ids.P60_DOCK_HANDLER:
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
if objb in [
obj_ids.GYRO_0_ADIS_HANDLER_ID,
obj_ids.GYRO_1_L3G_HANDLER_ID,
obj_ids.GYRO_2_ADIS_HANDLER_ID,
obj_ids.GYRO_3_L3G_HANDLER_ID,
]:
handle_gyros_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
)
if objb in [
obj_ids.MGM_0_LIS3_HANDLER_ID,
obj_ids.MGM_1_RM3100_HANDLER_ID,
obj_ids.MGM_2_LIS3_HANDLER_ID,
obj_ids.MGM_3_RM3100_HANDLER_ID,
]:
handle_mgm_hk_data(
object_id=object_id, hk_data=hk_data, printer=printer, set_id=set_id
)
if objb == obj_ids.PL_PCDU_ID:
log_to_both(printer, "Received PL PCDU HK data")
if objb == obj_ids.THERMAL_CONTROLLER_ID:
handle_thermal_controller_hk_data( object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data)
handle_thermal_controller_hk_data(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return HkReplyUnpacked()
def handle_syrlinks_rx_registers_dataset( printer: FsfwTmTcPrinter, hk_data: bytes):
reply = HkReplyUnpacked()
header_list = [
"RX Status",
"RX Sensitivity",
"RX Frequency Shift",
"RX IQ Power",
"RX AGC Value",
"RX Demod Eb",
"RX Demod N0",
"RX Datarate",
]
rx_status = hk_data[0]
rx_sensitivity = struct.unpack("!I", hk_data[1:5])
rx_frequency_shift = struct.unpack("!I", hk_data[5:9])
rx_iq_power = struct.unpack("!H", hk_data[9:11])
rx_agc_value = struct.unpack("!H", hk_data[11:13])
rx_demod_eb = struct.unpack("!I", hk_data[13:17])
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])
rx_data_rate = hk_data[21]
content_list = [
rx_status,
rx_sensitivity,
rx_frequency_shift,
rx_iq_power,
rx_agc_value,
rx_demod_eb,
rx_demod_n0,
rx_data_rate,
]
validity_buffer = hk_data[22:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
def handle_syrlinks_tx_registers_dataset(
printer: FsfwTmTcPrinter,
hk_data: bytes,
):
reply = HkReplyUnpacked()
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack("!H", hk_data[2:4])
content_list = [tx_status, tx_waveform, tx_agc_value]
validity_buffer = hk_data[4:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
header_list = [
"Init Err",
"Init Raw Mag X [nT]",
"Init Raw Mag Y [nT]",
"Init Raw Mag Z [nT]",
"Init Cal Mag X [nT]",
"Init Cal Mag Y [nT]",
"Init Cal Mag Z [nT]",
"Init Coil X Current [mA]",
"Init Coil Y Current [mA]",
"Init Coil Z Current [mA]",
"Init Coil X Temperature [°C]",
"Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]",
"Err",
"Raw Mag X [nT]",
"Raw Mag Y [nT]",
"Raw Mag Z [nT]",
"Cal Mag X [nT]",
"Cal Mag Y [nT]",
"Cal Mag Z [nT]",
"Coil X Current [mA]",
"Coil Y Current [mA]",
"Coil Z Current [mA]",
"Coil X Temperature [°C]",
"Coil Y Temperature [°C]",
"Coil Z Temperature [°C]",
"Fina Err",
"Fina Raw Mag X [nT]",
"Fina Raw Mag Y [nT]",
"Fina Raw Mag Z [nT]",
"Fina Cal Mag X [nT]",
"Fina Cal Mag Y [nT]",
"Fina Cal Mag Z [nT]",
"Fina Coil X Current [mA]",
"Fina Coil Y Current [mA]",
"Fina Coil Z Current [mA]",
"Fina Coil X Temperature [°C]",
"Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]",
]
# INIT step (no coil actuation)
init_err = hk_data[0]
init_raw_mag_x = struct.unpack("!f", hk_data[1:5])[0]
init_raw_mag_y = struct.unpack("!f", hk_data[5:9])[0]
init_raw_mag_z = struct.unpack("!f", hk_data[9:13])[0]
init_cal_mag_x = struct.unpack("!f", hk_data[13:17])[0]
init_cal_mag_y = struct.unpack("!f", hk_data[17:21])[0]
init_cal_mag_z = struct.unpack("!f", hk_data[21:25])[0]
init_coil_x_current = struct.unpack("!f", hk_data[25:29])[0]
init_coil_y_current = struct.unpack("!f", hk_data[29:33])[0]
init_coil_z_current = struct.unpack("!f", hk_data[33:37])[0]
init_coil_x_temperature = struct.unpack("!H", hk_data[37:39])[0]
init_coil_y_temperature = struct.unpack("!H", hk_data[39:41])[0]
init_coil_z_temperature = struct.unpack("!H", hk_data[41:43])[0]
# Actuation step
err = hk_data[43]
raw_mag_x = struct.unpack("!f", hk_data[44:48])[0]
raw_mag_y = struct.unpack("!f", hk_data[48:52])[0]
raw_mag_z = struct.unpack("!f", hk_data[52:56])[0]
cal_mag_x = struct.unpack("!f", hk_data[56:60])[0]
cal_mag_y = struct.unpack("!f", hk_data[60:64])[0]
cal_mag_z = struct.unpack("!f", hk_data[64:68])[0]
coil_x_current = struct.unpack("!f", hk_data[68:72])[0]
coil_y_current = struct.unpack("!f", hk_data[72:76])[0]
coil_z_current = struct.unpack("!f", hk_data[76:80])[0]
coil_x_temperature = struct.unpack("!H", hk_data[80:82])[0]
coil_y_temperature = struct.unpack("!H", hk_data[82:84])[0]
coil_z_temperature = struct.unpack("!H", hk_data[84:86])[0]
# FINA step (no coil actuation)
fina_err = hk_data[86]
fina_raw_mag_x = struct.unpack("!f", hk_data[87:91])[0]
fina_raw_mag_y = struct.unpack("!f", hk_data[91:95])[0]
fina_raw_mag_z = struct.unpack("!f", hk_data[95:99])[0]
fina_cal_mag_x = struct.unpack("!f", hk_data[99:103])[0]
fina_cal_mag_y = struct.unpack("!f", hk_data[103:107])[0]
fina_cal_mag_z = struct.unpack("!f", hk_data[107:111])[0]
fina_coil_x_current = struct.unpack("!f", hk_data[111:115])[0]
fina_coil_y_current = struct.unpack("!f", hk_data[115:119])[0]
fina_coil_z_current = struct.unpack("!f", hk_data[119:123])[0]
fina_coil_x_temperature = struct.unpack("!H", hk_data[123:125])[0]
fina_coil_y_temperature = struct.unpack("!H", hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack("!H", hk_data[127:129])[0]
validity_buffer = hk_data[129:]
content_list = [
init_err,
init_raw_mag_x,
init_raw_mag_y,
init_raw_mag_z,
init_cal_mag_x,
init_cal_mag_y,
init_cal_mag_z,
init_coil_x_current,
init_coil_y_current,
init_coil_z_current,
init_coil_x_temperature,
init_coil_y_temperature,
init_coil_z_temperature,
err,
raw_mag_x,
raw_mag_y,
raw_mag_z,
cal_mag_x,
cal_mag_y,
cal_mag_z,
coil_x_current,
coil_y_current,
coil_z_current,
coil_x_temperature,
coil_y_temperature,
coil_z_temperature,
fina_err,
fina_raw_mag_x,
fina_raw_mag_y,
fina_raw_mag_z,
fina_cal_mag_x,
fina_cal_mag_y,
fina_cal_mag_z,
fina_coil_x_current,
fina_coil_y_current,
fina_coil_z_current,
fina_coil_x_temperature,
fina_coil_y_temperature,
fina_coil_z_temperature,
]
num_of_vars = len(header_list)
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
def handle_thermal_controller_hk_data(object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == 0:
LOGGER.info("Received Sensor Temperature data")
# get all the floats
tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16 * 4])
parsed_data = {}
# put them into an nice dictionary
parsed_data["SENSOR_PLOC_HEATSPREADER"] = tm_data[0]
parsed_data["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1]
parsed_data["SENSOR_4K_CAMERA"] = tm_data[2]
parsed_data["SENSOR_DAC_HEATSPREADER"] = tm_data[3]
parsed_data["SENSOR_STARTRACKER"] = tm_data[4]
parsed_data["SENSOR_RW1"] = tm_data[5]
parsed_data["SENSOR_DRO"] = tm_data[6]
parsed_data["SENSOR_SCEX"] = tm_data[7]
parsed_data["SENSOR_X8"] = tm_data[8]
parsed_data["SENSOR_HPA"] = tm_data[9]
parsed_data["SENSOR_TX_MODUL"] = tm_data[10]
parsed_data["SENSOR_MPA"] = tm_data[11]
parsed_data["SENSOR_ACU"] = tm_data[12]
parsed_data["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13]
parsed_data["SENSOR_TCS_BOARD"] = tm_data[14]
parsed_data["SENSOR_MAGNETTORQUER"] = tm_data[15]
TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data)
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
reply = HkReplyUnpacked()
var_index = 0
header_list = [
"Latitude",
"Longitude",
"Altitude",
"Fix Mode",
"Sats in Use",
"Date",
"Unix Seconds",
]
latitude = struct.unpack("!d", hk_data[0:8])[0]
longitude = struct.unpack("!d", hk_data[8:16])[0]
altitude = struct.unpack("!d", hk_data[16:24])[0]
fix_mode = hk_data[24]
sat_in_use = hk_data[25]
year = struct.unpack("!H", hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
unix_seconds = struct.unpack("!I", hk_data[33:37])[0]
content_list = [
latitude,
longitude,
altitude,
fix_mode,
sat_in_use,
date_string,
unix_seconds,
]
var_index += 13
reply.num_of_vars = var_index
if not os.path.isfile("gps_log.txt"):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
"Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, "
"Date, Unix Seconds\n"
)
with open("gps_log.txt", "a") as gps_file:
gps_file.write(
f"{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, "
f"{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n"
)
validity_buffer = hk_data[37:39]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
def handle_bpx_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == BpxSetIds.GET_HK_SET:
fmt_str = "!HHHHhhhhIB"
inc_len = struct.calcsize(fmt_str)
(
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
) = struct.unpack(fmt_str, hk_data[0:inc_len])
header_list = [
"Charge Current",
"Discharge Current",
"Heater Current",
"Battery Voltage",
"Batt Temp 1",
"Batt Temp 2",
"Batt Temp 3",
"Batt Temp 4",
"Reboot Counter",
"Boot Cause",
]
content_list = [
charge_current,
discharge_current,
heater_current,
batt_voltage,
batt_temp_1,
batt_temp_2,
batt_temp_3,
batt_temp_4,
reboot_cntr,
boot_cause,
]
validity_buffer = hk_data[inc_len:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
elif set_id == BpxSetIds.GET_CFG_SET:
battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
battheat_high = struct.unpack("!b", hk_data[2:3])[0]
header_list = [
"Battery Heater Mode",
"Battery Heater Low Limit",
"Battery Heater High Limit",
]
content_list = [battheat_mode, battheat_low, battheat_high]
validity_buffer = hk_data[3:]
log_to_both(printer, str(header_list))
log_to_both(printer, str(content_list))
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=10)
def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
fmt_str = "!fffH"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
fmt_str, hk_data[0: 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
)
log_to_both(printer, printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
P60_INDEX_LIST = [
"ACU VCC",
"PDU1 VCC",
"X3 IDLE VCC",
"PDU2 VCC",
"ACU VBAT",
"PDU1 VBAT",
"X3 IDLE VBAT",
"PDU2 VBAT",
"STACK VBAT",
"STACK 3V3",
"STACK 5V",
"GS3V3",
"GS5V",
]
WDT_LIST = ["GND", "I2C", "CAN", "CSP0", "CSP1"]
PDU1_CHANNELS_NAMES = [
"TCS Board",
"Syrlinks",
"Startracker",
"MGT",
"SUS Nominal",
"SCEX",
"PLOC",
"ACS A Side",
"Unused Channel 8",
]
PDU2_CHANNELS_NAMES = [
"Q7S",
"Payload PCDU CH1",
"RW",
"TCS Heater In",
"SUS Redundant",
"Deployment Mechanism",
"Payload PCDU CH6",
"ACS B Side",
"Payload Camera",
]
PDU_CHANNEL_NAMES = [PDU1_CHANNELS_NAMES, PDU2_CHANNELS_NAMES]
class WdtInfo:
def __init__(self):
self.wdt_reboots_list = []
self.time_pings_left_list = []
def print(self, printer: FsfwTmTcPrinter):
wdt_info = "WDT Type | Reboots | Time or Pings left (CSP only)"
log_to_both(printer, wdt_info)
for idx in range(len(self.wdt_reboots_list)):
log_to_both(
printer,
f"{TmHandler.WDT_LIST[idx].ljust(5)} | "
f"{self.wdt_reboots_list[idx]:010} | {self.time_pings_left_list[idx]:010}",
)
def parse(self, wdt_data: bytes, current_idx: int) -> int:
priv_idx = 0
self.wdt_reboots_list = []
self.time_pings_left_list = []
for idx in range(5):
self.wdt_reboots_list.append(
struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(3):
self.time_pings_left_list.append(
struct.unpack("!I", wdt_data[priv_idx: priv_idx + 4])[0]
)
priv_idx += 4
current_idx += 4
for idx in range(2):
self.time_pings_left_list.append(wdt_data[priv_idx])
current_idx += 1
priv_idx += 1
return current_idx
def handle_pdu_data(
printer: FsfwTmTcPrinter, pdu_idx: int, set_id: int, hk_data: bytes
):
current_idx = 0
priv_idx = pdu_idx - 1
if set_id == SetIds.PDU_1_AUX or set_id == SetIds.PDU_2_AUX:
fmt_str = "!hhBBBIIH"
inc_len = struct.calcsize(fmt_str)
(
vcc,
vbat,
conv_enb_0,
conv_enb_1,
conv_enb_2,
boot_cause,
uptime,
reset_cause,
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
log_to_both(printer, f"VCC {vcc} mV | VBAT {vbat} mV")
log_to_both(
printer, f"Converter Enables [{conv_enb_0},{conv_enb_1},{conv_enb_2}]"
)
log_to_both(
printer,
f"Boot Cause {boot_cause} | Uptime {uptime} | Reset Cause {reset_cause}",
)
current_idx += inc_len
latchup_list = []
log_to_both(printer, "Latchups")
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
)
content_line = (
f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {latchup_list[idx]}"
)
log_to_both(printer, content_line)
current_idx += 2
device_types = []
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
device_types.append(hk_data[current_idx])
current_idx += 1
device_statuses = []
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
device_statuses.append(hk_data[current_idx])
current_idx += 1
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
wdt.print(printer=printer)
if set_id == SetIds.PDU_1_CORE or set_id == SetIds.PDU_2_CORE:
log_to_both(printer, f"Received PDU HK from PDU {pdu_idx}")
current_list = []
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
current_list.append(
struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
)
current_idx += 2
output_enb_list = []
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
output_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(len(TmHandler.PDU1_CHANNELS_NAMES)):
out_enb = f"{output_enb_list[idx]}".ljust(6)
content_line = (
f"{TmHandler.PDU_CHANNEL_NAMES[priv_idx][idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
log_to_both(printer, content_line)
fmt_str = "!IBh"
inc_len = struct.calcsize(fmt_str)
(boot_count, batt_mode, temperature) = struct.unpack(
fmt_str, hk_data[current_idx: current_idx + inc_len]
)
info = (
f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
f"Temperature {temperature / 10.0}"
)
log_to_both(printer, info)
def handle_p60_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetIds.P60_CORE:
log_to_both(printer, "Received P60 Core HK. Voltages in mV, currents in mA")
current_idx = 0
current_list = []
for idx in range(13):
current_list.append(
struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
)
current_idx += 2
voltage_list = []
for idx in range(13):
voltage_list.append(
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
)
current_idx += 2
out_enb_list = []
for idx in range(13):
out_enb_list.append(hk_data[current_idx])
current_idx += 1
header_str = f"{'Name'.ljust(24)} | OutEnb | U [mV] | I [mA]"
print(header_str)
printer.file_logger.info(header_str)
for idx in range(13):
out_enb = f"{out_enb_list[idx]}".ljust(6)
content_line = (
f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {out_enb} | "
f"{voltage_list[idx]:05} | {current_list[idx]:04}"
)
log_to_both(printer, content_line)
fmt_str = "!IBhHhh"
inc_len = struct.calcsize(fmt_str)
(
boot_count,
batt_mode,
batt_current,
batt_voltage,
temp_0,
temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
current_idx += inc_len
batt_info = (
f"Batt: Mode {batt_mode} | Boot Count {boot_count} | "
f"Charge current {batt_current} | Voltage {batt_voltage}"
)
temps = f"In C: Temp 0 {temp_0 / 10.0} | Temp 1 {temp_1 / 10.0} | "
log_to_both(printer, temps)
log_to_both(printer, batt_info)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=9)
if set_id == SetIds.P60_AUX:
log_to_both(printer, "Received P60 AUX HK. Voltages in mV, currents in mA")
current_idx = 0
latchup_list = []
log_to_both(printer, "P60 Dock Latchups")
for idx in range(0, 13):
latchup_list.append(
struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
)
content_line = f"{TmHandler.P60_INDEX_LIST[idx].ljust(24)} | {latchup_list[idx]}"
log_to_both(printer, content_line)
current_idx += 2
fmt_str = "!IIHBBHHhhB"
inc_len = struct.calcsize(fmt_str)
(
boot_cause,
uptime,
reset_cause,
heater_on,
conv_5v_on,
dock_vbat,
dock_vcc_c,
batt_temp_0,
batt_temp_1,
dearm_status,
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
current_idx += inc_len
wdt = WdtInfo()
current_idx = wdt.parse(wdt_data=hk_data[current_idx:], current_idx=current_idx)
fmt_str = "!hhbb"
inc_len = struct.calcsize(fmt_str)
(
batt_charge_current,
batt_discharge_current,
ant6_depl,
ar6_depl,
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len])
current_idx += inc_len
device_types = []
device_statuses = []
for idx in range(8):
device_types.append(hk_data[current_idx])
current_idx += 1
for idx in range(8):
device_statuses.append(hk_data[current_idx])
current_idx += 1
util_info = (
f"Reset Cause {reset_cause} | Boot Cause {boot_cause} | Uptime {uptime}"
)
util_info_2 = (
f"Conv 5V on {conv_5v_on} | Heater On {heater_on} | "
f"Dock VBAT {dock_vbat} | DOCK VCC Current {dock_vcc_c}"
)
log_to_both(printer, util_info)
log_to_both(printer, util_info_2)
wdt.print(printer)
misc_info = (
f"Dearm {dearm_status} | ANT6 Depl {ant6_depl} | AR6 Deply {ar6_depl}"
)
log_to_both(printer, misc_info)
batt_info = (
f"Batt Temp 0 {batt_temp_0 / 10.0} | Batt Temp 1 {batt_temp_1 / 10.0} | "
f"Charge Current {batt_charge_current} | Discharge Current {batt_discharge_current}"
)
log_to_both(printer, batt_info)
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
)

View File

21
pus_tm/system/core.py Normal file
View File

@ -0,0 +1,21 @@
import struct
from pus_tm.defs import PrintWrapper
from pus_tc.system.core import SetIds
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetIds.HK:
pw = PrintWrapper(printer)
fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str)
(temperature, ps_voltage, pl_voltage) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
printout = (
f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
f"PL Voltage [mV] {pl_voltage}"
)
pw.dlog(printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=3)

41
pus_tm/system/tcs.py Normal file
View File

@ -0,0 +1,41 @@
import struct
from pus_tm.defs import PrintWrapper
from pus_tm.tm_tcp_server import TmTcpServer
from tmtccmd.utility import ObjectId
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
TM_TCP_SERVER = TmTcpServer.getInstance()
def handle_thermal_controller_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == 0:
pw = PrintWrapper(printer)
pw.dlog("Received Sensor Temperature data")
# get all the floats
tm_data = struct.unpack("!ffffffffffffffff", hk_data[: 16 * 4])
parsed_data = {}
# put them into an nice dictionary
parsed_data["SENSOR_PLOC_HEATSPREADER"] = tm_data[0]
parsed_data["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1]
parsed_data["SENSOR_4K_CAMERA"] = tm_data[2]
parsed_data["SENSOR_DAC_HEATSPREADER"] = tm_data[3]
parsed_data["SENSOR_STARTRACKER"] = tm_data[4]
parsed_data["SENSOR_RW1"] = tm_data[5]
parsed_data["SENSOR_DRO"] = tm_data[6]
parsed_data["SENSOR_SCEX"] = tm_data[7]
parsed_data["SENSOR_X8"] = tm_data[8]
parsed_data["SENSOR_HPA"] = tm_data[9]
parsed_data["SENSOR_TX_MODUL"] = tm_data[10]
parsed_data["SENSOR_MPA"] = tm_data[11]
parsed_data["SENSOR_ACU"] = tm_data[12]
parsed_data["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13]
parsed_data["SENSOR_TCS_BOARD"] = tm_data[14]
parsed_data["SENSOR_MAGNETTORQUER"] = tm_data[15]
TM_TCP_SERVER.report_parsed_hk_data(object_id, set_id, parsed_data)

View File

@ -18,8 +18,7 @@ class TmTcpServer:
_Instance = None
def __init__(
self):
def __init__(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -65,7 +64,7 @@ class TmTcpServer:
# dle encode the bytes
# adding a newline because someone might want to look at it in a console
data_json_bytes = self.dle_encoder.encode(data_json_bytes + b'\n')
data_json_bytes = self.dle_encoder.encode(data_json_bytes + b"\n")
try:
sent_length = self.client_connection.send(data_json_bytes)
@ -76,27 +75,25 @@ class TmTcpServer:
self.client_connection.close()
self.client_connection = None
def report_raw_hk_data(self, object_id: ObjectId,
set_id: int,
hk_data: bytes):
def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes):
data_dict = {}
data_dict["type"] = "TM"
data_dict["tmType"] = "Raw HK"
data_dict["objectId"] = object_id.as_string
data_dict["setId"] = set_id
data_dict["rawData"] = base64.b64encode(hk_data).decode()
data_dict = {
"type": "TM",
"tmType": "Raw HK",
"objectId": object_id.as_string,
"setId": set_id,
"rawData": base64.b64encode(hk_data).decode(),
}
self._send_dictionary_over_socket(data_dict)
def report_parsed_hk_data(self, object_id: ObjectId,
set_id: int,
data_dictionary):
data_dict = {}
data_dict["type"] = "TM"
data_dict["tmType"] = "Parsed HK"
data_dict["objectId"] = object_id.as_string
data_dict["setId"] = set_id
data_dict["content"] = data_dictionary
def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary):
data_dict = {
"type": "TM",
"tmType": "Parsed HK",
"objectId": object_id.as_string,
"setId": set_id,
"content": data_dictionary,
}
self._send_dictionary_over_socket(data_dict)
self._send_dictionary_over_socket(data_dict)

@ -1 +1 @@
Subproject commit de8656e3a657782e457630ea10de0fce08856f35
Subproject commit 208f5779327db817841f073d0bbdd96da491b5d9

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""EIVE TMTC Commander"""
from distutils.log import debug
import sys
import traceback
@ -63,11 +62,11 @@ def main():
tmtc_backend.set_mode(CoreModeList.CONTINUOUS_MODE)
get_console_logger().info("Disabling console logger for continuous operation")
get_console_logger().setLevel("ERROR")
# get_console_logger().info("Disabling console logger for continuous operation")
# get_console_logger().setLevel("ERROR")
tmtccmd.init_and_start_daemons(tmtc_backend=tmtc_backend)
tmtccmd.performOperation(tmtc_backend=tmtc_backend)
tmtc_backend.perform_operation()
# remove cmdline args so that we can reuse code
sys.argv = sys.argv[:1]
@ -81,7 +80,7 @@ def main():
tmtc_backend.set_opcode(args.op_code)
tmtc_backend.set_mode(CoreModeList.CONTINUOUS_MODE)
tmtccmd.performOperation(tmtc_backend=tmtc_backend)
tmtc_backend.perform_operation()
if __name__ == "__main__":