robin fixes #137

Merged
mkranz merged 1 commits from kranz_robin_fixes into main 2023-02-21 14:30:34 +01:00
6 changed files with 88 additions and 21 deletions

View File

@ -36,6 +36,7 @@ class CustomServiceList(str, enum.Enum):
POWER = "power"
ACU = "acu"
ACS = "acs"
GYRO = "gyro"
COM_SS = "com"
BPX_BATTERY = "bpx"
HEATER = "heater"

View File

@ -3,6 +3,7 @@
import logging
from typing import cast
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
from eive_tmtc.tmtc.power.power import pack_power_commands
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
@ -177,6 +178,8 @@ def handle_default_procedure(
return pack_syrlinks_command(
object_id=object_id, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.GYRO.value:
return handle_gyr_cmd(q=queue_helper, op_code=op_code)
if service == CustomServiceList.PROCEDURE.value:
return pack_proc_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.PL_PCDU.value:

View File

@ -0,0 +1 @@
from .gyros import add_gyr_cmd_defs

View File

@ -1,13 +1,25 @@
import enum
import logging
import struct
import eive_tmtc.config.object_ids as obj_ids
from tmtccmd.tc import DefaultPusQueueHelper
import eive_tmtc.config.object_ids as obj_ids
from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_hk_command, make_sid
from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper
from eive_tmtc.config.object_ids import GYRO_0_ADIS_HANDLER_ID, GYRO_1_L3G_HANDLER_ID, GYRO_2_ADIS_HANDLER_ID, GYRO_3_L3G_HANDLER_ID
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
class OpCode:
CORE_HK = "core_hk"
CFG_HK = "cfg_hk"
class AdisGyroSetId(enum.IntEnum):
CORE_HK = 0
CFG_HK = 1
@ -17,6 +29,46 @@ class L3gGyroSetId(enum.IntEnum):
CORE_HK = 0
class GyrSel(enum.IntEnum):
GYR_0_ADIS = 0
GYR_1_L3G = 1
GYR_2_ADIS = 2
GYR_3_L3G = 3
GYR_SEL_DICT = {
GyrSel.GYR_0_ADIS: ("GYRO_0_ADIS", GYRO_0_ADIS_HANDLER_ID),
GyrSel.GYR_1_L3G: ("GYRO_1_L3G", GYRO_1_L3G_HANDLER_ID),
GyrSel.GYR_2_ADIS: ("GYRO_2_ADIS", GYRO_2_ADIS_HANDLER_ID),
GyrSel.GYR_3_L3G: ("GYRO_3_L3G", GYRO_3_L3G_HANDLER_ID),
}
def handle_gyr_cmd(q: DefaultPusQueueHelper, op_code: str):
print("Please select the Gyro Device")
for (k, v) in GYR_SEL_DICT.items():
print(f"{k}: {v[0]}")
sel_idx = int(input("Select gyro device by index: "))
gyr_info = GYR_SEL_DICT[GyrSel(sel_idx)]
gyr_obj_id = gyr_info[1]
is_adis = False
if sel_idx == GyrSel.GYR_0_ADIS or sel_idx == GyrSel.GYR_2_ADIS:
is_adis = True
core_hk_id = AdisGyroSetId.CORE_HK
else:
core_hk_id = L3gGyroSetId.CORE_HK
if op_code == OpCode.CORE_HK:
q.add_log_cmd(f"Gyro {gyr_info[0]} Core HK")
q.add_pus_tc(create_request_one_hk_command(make_sid(gyr_obj_id, core_hk_id)))
elif op_code == OpCode.CFG_HK:
if not is_adis:
raise ValueError("No config HK for L3 device")
q.add_log_cmd(f"Gyro {gyr_info[0]} CFG HK")
q.add_pus_tc(create_request_one_hk_command(make_sid(gyr_obj_id, AdisGyroSetId.CFG_HK)))
else:
logging.getLogger(__name__).warning(f"invalid op code {op_code} for gyro command")
def handle_gyros_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
@ -43,24 +95,26 @@ def handle_adis_gyro_hk(
pw = PrintWrapper(printer)
fmt_str = "!ddddddf"
inc_len = struct.calcsize(fmt_str)
(angVelocX, angVelocY, angVelocZ, accelX, accelY, accelZ, temp) = struct.unpack(
(ang_veloc_x, ang_veloc_y, ang_veloc_z, accel_x, accel_y, accel_z, temp) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
pw.dlog(f"Received ADIS1650X Gyro HK data from object {object_id}")
pw.dlog(
f"Angular Velocities (degrees per second): X {angVelocX} | "
f"Y {angVelocY} | Z {angVelocZ}"
f"Angular Velocities (degrees per second): X {ang_veloc_x} | "
f"Y {ang_veloc_y} | Z {ang_veloc_z}"
)
pw.dlog(f"Acceleration (m/s^2): X {accelX} | Y {accelY} | Z {accelZ}")
pw.dlog(f"Acceleration (m/s^2): X {accel_x} | Y {accel_y} | Z {accel_z}")
pw.dlog(f"Temperature {temp} C")
if set_id == AdisGyroSetId.CFG_HK:
pw = PrintWrapper(printer)
fmt_str = "!HBHH"
fmt_str = "!HBHHH"
inc_len = struct.calcsize(fmt_str)
(diag_stat_reg, filter_setting, msc_ctrl_reg, dec_rate_reg) = struct.unpack(
print(len(hk_data))
(diag_stat_reg, filter_setting, range_mdl, msc_ctrl_reg, dec_rate_reg) = struct.unpack(
fmt_str, hk_data[0 : 0 + inc_len]
)
pw.dlog(f"Diagnostic Status Register {diag_stat_reg:#018b}")
pw.dlog(f"Range MDL {range_mdl}")
pw.dlog(f"Filter Settings {filter_setting:#010b}")
pw.dlog(f"Miscellaneous Control Register {msc_ctrl_reg:#018b}")
pw.dlog(f"Decimation Rate {dec_rate_reg:#06x}")
@ -82,3 +136,11 @@ def handle_l3g_gyro_hk(
f"Y {angVelocY} | Z {angVelocZ}"
)
pw.dlog(f"Temperature {temp} °C")
@tmtc_definitions_provider
def add_gyr_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.CORE_HK, info="Request Core HK")
oce.add(keys=OpCode.CFG_HK, info="Request CFG HK")
defs.add_service(CustomServiceList.GYRO, info="Gyro", op_code_entry=oce)

View File

@ -57,7 +57,7 @@ def handle_mgm_rm3100_hk_data(
fmt_str = f"!fff"
inc_len = struct.calcsize(fmt_str)
(field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])
pw.dlog(f"Received MGM LIS3 from object {object_id}")
pw.dlog(f"Received MGM RM3100 from object {object_id}")
pw.dlog(
f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}"
)

View File

@ -61,7 +61,9 @@ def handle_thermal_controller_hk_data(
pw = PrintWrapper(printer)
pw.dlog("Received device temperature data")
fmt_str = "!fhhhhiiiifffhffffffffffffff"
fmt_len = struct.calcsize(fmt_str)
tm_data = struct.unpack(fmt_str, hk_data[:98])
valid_list = validity_buffer_list(hk_data[fmt_len:], 25)
parsed_data = {
"Q7S_TEMPERATURE": tm_data[0],
"BATTERY_TEMPERATURE_1": tm_data[1],
@ -76,9 +78,7 @@ def handle_thermal_controller_hk_data(
"SYRLINKS_POWER_AMPLIFIER_TEMPERATURE": tm_data[10],
"SYRLINKS_BASEBAND_BOARD_TEMPERATURE": tm_data[11],
"MGT_TEMPERATURE": tm_data[12],
"ACU_TEMPERATURE_1": tm_data[13],
"ACU_TEMPERATURE_2": tm_data[14],
"ACU_TEMPERATURE_3": tm_data[15],
"ACU_TEMPERATURES": (tm_data[13], tm_data[14], tm_data[15]),
"PDU1_TEMPERATURE": tm_data[16],
"PDU2_TEMPERATURE": tm_data[17],
"P60DOCK_TEMPERATURE_1": tm_data[18],
@ -87,18 +87,19 @@ def handle_thermal_controller_hk_data(
"GYRO_1_TEMPERATURE": tm_data[21],
"GYRO_2_TEMPERATURE": tm_data[22],
"GYRO_3_TEMPERATURE": tm_data[23],
"MGM_0_TEMPERATURE": tm_data[24],
"MGM_1_TEMPERATURE": tm_data[25],
"MGM_0_LIS3_TEMPERATURE": tm_data[24],
"MGM_2_LIS3_TEMPERATURE": tm_data[25],
"ADC_PL_PCDU_TEMPERATURE": tm_data[26],
}
printer.file_logger.info(str(parsed_data))
pp = pprint.PrettyPrinter(depth=4)
pp.pprint(parsed_data)
for idx, (k, v) in enumerate(parsed_data.items()):
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
elif set_id == CtrlSetId.SUS_TEMP_SENSORS:
pw = PrintWrapper(printer)
pw.dlog("Received SUS temperature data")
fmt_str = "!ffffffffffffffffff"
tm_data = struct.unpack(fmt_str, hk_data[: 4 * 18])
fmt_str = "!ffffffffffff"
fmt_len = struct.calcsize(fmt_str)
tm_data = struct.unpack(fmt_str, hk_data[: 12 * 4])
valid_list = validity_buffer_list(hk_data[fmt_len:], 12)
parsed_data = {
"SUS_0": tm_data[0],
"SUS_1": tm_data[1],
@ -113,9 +114,8 @@ def handle_thermal_controller_hk_data(
"SUS_10": tm_data[10],
"SUS_11": tm_data[11],
}
printer.file_logger.info(str(parsed_data))
pp = pprint.PrettyPrinter(depth=4)
pp.pprint(parsed_data)
for idx, (k, v) in enumerate(parsed_data.items()):
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
elif set_id == CtrlSetId.HEATER_INFO:
print("Heater Switch States")
for i in range(8):