Merge branch 'main' into meier/com-ss
This commit is contained in:
@ -3,3 +3,5 @@ from .solar_array_deployment import add_sa_depl_cmds
|
||||
from .test import add_test_defs
|
||||
from .time import add_time_cmds
|
||||
from .health import add_health_cmd_defs
|
||||
from .system import add_system_cmd_defs
|
||||
from .tm_store import add_persistent_tm_store_cmd_defs
|
||||
|
@ -0,0 +1 @@
|
||||
from .gyros import add_gyr_cmd_defs
|
||||
|
@ -21,6 +21,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
make_sid,
|
||||
enable_periodic_hk_command_with_interval,
|
||||
disable_periodic_hk_command,
|
||||
create_request_one_diag_command,
|
||||
)
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
@ -39,9 +40,14 @@ class SetId(enum.IntEnum):
|
||||
|
||||
|
||||
class Submode(enum.IntEnum):
|
||||
SAFE = 2
|
||||
DETUMBLE = 3
|
||||
IDLE = 4
|
||||
OFF = 0
|
||||
SAFE = 10
|
||||
DETUMBLE = 11
|
||||
IDLE = 12
|
||||
PTG_NADIR = 13
|
||||
PTG_TARGET = 14
|
||||
PTG_TARGET_GS = 15
|
||||
PTG_INERTIAL = 16
|
||||
|
||||
|
||||
class OpCodes:
|
||||
@ -286,7 +292,9 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
||||
elif op_code in OpCodes.REQUEST_PROC_GYR_HK:
|
||||
q.add_log_cmd(Info.REQUEST_PROC_GYR_HK)
|
||||
q.add_pus_tc(
|
||||
generate_one_hk_command(make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET))
|
||||
create_request_one_diag_command(
|
||||
make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
|
||||
)
|
||||
)
|
||||
elif op_code in OpCodes.ENABLE_PROC_GYR_HK:
|
||||
q.add_log_cmd(Info.ENABLE_PROC_GYR_HK)
|
||||
@ -433,30 +441,31 @@ def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
|
||||
)
|
||||
return
|
||||
current_idx = 0
|
||||
vec_fmt = "[{:8.3f}, {:8.3f}, {:8.3f}]"
|
||||
for idx in range(12):
|
||||
fmt_str = "!fff"
|
||||
length = struct.calcsize(fmt_str)
|
||||
sus_list = struct.unpack(fmt_str, hk_data[current_idx : current_idx + length])
|
||||
sus_list_formatted = [f"{val:8.3f}" for val in sus_list]
|
||||
sus_list_formatted = vec_fmt.format(*sus_list)
|
||||
current_idx += length
|
||||
pw.dlog(f"SUS {idx} CALIB: {sus_list_formatted}")
|
||||
pw.dlog(f"{f'SUS {idx} CALIB'.ljust(25)}: {sus_list_formatted}")
|
||||
fmt_str = "!ddd"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
sus_vec_tot = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
|
||||
sus_vec_tot = [f"{val:8.3f}" for val in sus_vec_tot]
|
||||
sus_vec_tot = vec_fmt.format(*sus_vec_tot)
|
||||
current_idx += inc_len
|
||||
pw.dlog(f"SUS Vector Total: {sus_vec_tot}")
|
||||
pw.dlog(f"{'SUS Vector Total'.ljust(25)}: {sus_vec_tot}")
|
||||
sus_vec_tot_deriv = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
sus_vec_tot_deriv = [f"{val:8.3f}" for val in {sus_vec_tot_deriv}]
|
||||
sus_vec_tot_deriv = vec_fmt.format(*sus_vec_tot_deriv)
|
||||
current_idx += inc_len
|
||||
pw.dlog(f"SUS Vector Derivative: {sus_vec_tot_deriv}")
|
||||
pw.dlog(f"{'SUS Vector Derivative'.ljust(25)}: {sus_vec_tot_deriv}")
|
||||
sun_ijk_model = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
sun_ijk_model = [f"{val:8.3f}" for val in sun_ijk_model]
|
||||
sun_ijk_model = vec_fmt.format(*sun_ijk_model)
|
||||
current_idx += inc_len
|
||||
pw.dlog(f"SUS ijk Model: {sun_ijk_model}")
|
||||
pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=15)
|
||||
|
||||
|
||||
@ -511,8 +520,6 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
|
||||
for entry in zip(print_str_list, formatted_list):
|
||||
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
|
||||
current_idx += 1
|
||||
if PERFORM_MGM_CALIBRATION:
|
||||
perform_mgm_calibration(pw, mgm_0_lis3_floats_ut)
|
||||
assert current_idx == 61
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=6)
|
||||
|
||||
@ -525,31 +532,49 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog("Recieved HK set too small")
|
||||
return
|
||||
current_idx = 0
|
||||
for i in range(5):
|
||||
fmt_str = "!fff"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
mgm_vec = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_vec = [f"{val:8.3f}" for val in mgm_vec]
|
||||
pw.dlog(f"MGM {i}: {mgm_vec}")
|
||||
current_idx += inc_len
|
||||
fmt_str = "!fff"
|
||||
vec_fmt = "[{:8.3f}, {:8.3f}, {:8.3f}]"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
mgm_0 = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_0_str = vec_fmt.format(*mgm_0)
|
||||
pw.dlog(f"{'MGM 0 Vec'.ljust(25)}: {mgm_0_str}")
|
||||
current_idx += inc_len
|
||||
mgm_1 = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_1_str = vec_fmt.format(*mgm_1)
|
||||
pw.dlog(f"{'MGM 1 Vec'.ljust(25)}: {mgm_1_str}")
|
||||
current_idx += inc_len
|
||||
mgm_2 = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_2_str = vec_fmt.format(*mgm_2)
|
||||
pw.dlog(f"{'MGM 2 Vec'.ljust(25)}: {mgm_2_str}")
|
||||
current_idx += inc_len
|
||||
mgm_3 = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_3_str = vec_fmt.format(*mgm_3)
|
||||
pw.dlog(f"{'MGM 3 Vec'.ljust(25)}: {mgm_3_str}")
|
||||
current_idx += inc_len
|
||||
mgm_4 = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_4_str = vec_fmt.format(*mgm_4)
|
||||
pw.dlog(f"{'MGM 4 Vec'.ljust(25)}: {mgm_4_str}")
|
||||
current_idx += inc_len
|
||||
fmt_str = "!ddd"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
mgm_vec_tot = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
mgm_vec_tot = [f"{val:8.3f}" for val in mgm_vec_tot]
|
||||
mgm_vec_tot = vec_fmt.format(*mgm_vec_tot)
|
||||
current_idx += inc_len
|
||||
pw.dlog(f"MGM Total Vec: {mgm_vec_tot}")
|
||||
pw.dlog(f"{'MGM Total Vec'.ljust(25)}: {mgm_vec_tot}")
|
||||
mgm_vec_tot_deriv = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
mgm_vec_tot_deriv = [f"{val:8.3f}" for val in mgm_vec_tot_deriv]
|
||||
pw.dlog(f"MGM Total Vec Deriv: {mgm_vec_tot_deriv}")
|
||||
mgm_vec_tot_deriv = vec_fmt.format(*mgm_vec_tot_deriv)
|
||||
pw.dlog(f"{'MGM Total Vec Deriv'.ljust(25)}: {mgm_vec_tot_deriv}")
|
||||
current_idx += inc_len
|
||||
mag_igrf_model = struct.unpack(
|
||||
fmt_str, hk_data[current_idx : current_idx + inc_len]
|
||||
)
|
||||
mag_igrf_model = [f"{val:8.3f}" for val in mag_igrf_model]
|
||||
pw.dlog(f"MAG IGRF Model: {mag_igrf_model}")
|
||||
mag_igrf_model = vec_fmt.format(*mag_igrf_model)
|
||||
pw.dlog(f"{'MAG IGRF Model'.ljust(25)}: {mag_igrf_model}")
|
||||
current_idx += inc_len
|
||||
if PERFORM_MGM_CALIBRATION:
|
||||
perform_mgm_calibration(pw, mgm_3)
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=8)
|
||||
|
||||
|
||||
@ -619,7 +644,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
fmt_vec = "!ddd"
|
||||
inc_len_scalar = struct.calcsize(fmt_scalar)
|
||||
inc_len_vec = struct.calcsize(fmt_vec)
|
||||
if len(hk_data) < 2 * inc_len_scalar + inc_len_vec:
|
||||
if len(hk_data) < 2 * inc_len_scalar + 2 * inc_len_vec:
|
||||
pw.dlog("Received HK set too small")
|
||||
return
|
||||
current_idx = 0
|
||||
@ -637,21 +662,32 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_scalar
|
||||
pos = [
|
||||
f"{val:8.3f}"
|
||||
for val in struct.unpack(
|
||||
fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_vec
|
||||
velo = [
|
||||
f"{val:8.3f}"
|
||||
for val in struct.unpack(
|
||||
fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_vec
|
||||
pw.dlog(f"GPS Latitude: {lat} [rad]")
|
||||
pw.dlog(f"GPS Longitude: {long} [rad]")
|
||||
pw.dlog(f"GPS Position: {pos} [m]")
|
||||
pw.dlog(f"GPS Velocity: {velo} [m/s]")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=4)
|
||||
|
||||
|
||||
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog("Received MEKF Set")
|
||||
fmt_quat = "!dddd"
|
||||
fmt_str_4 = "[{:8.3f}, {:8.3f}, {:8.3f}, {:8.3f}]"
|
||||
fmt_str_3 = "[{:8.3f}, {:8.3f}, {:8.3f}]"
|
||||
fmt_vec = "!ddd"
|
||||
inc_len_quat = struct.calcsize(fmt_quat)
|
||||
inc_len_vec = struct.calcsize(fmt_vec)
|
||||
@ -659,21 +695,12 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog("Received HK set too small")
|
||||
return
|
||||
current_idx = 0
|
||||
quat = [
|
||||
f"{val:8.3f}"
|
||||
for val in struct.unpack(
|
||||
fmt_quat, hk_data[current_idx : current_idx + inc_len_quat]
|
||||
)
|
||||
]
|
||||
quat = struct.unpack(fmt_quat, hk_data[current_idx : current_idx + inc_len_quat])
|
||||
current_idx += inc_len_quat
|
||||
rate = [
|
||||
f"{val:8.3f}"
|
||||
for val in struct.unpack(
|
||||
fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]
|
||||
)
|
||||
]
|
||||
pw.dlog(f"MEKF Quaternion: {quat}")
|
||||
pw.dlog(f"MEKF Rotational Rate: {rate}")
|
||||
rate = struct.unpack(fmt_vec, hk_data[current_idx : current_idx + inc_len_vec])
|
||||
current_idx += inc_len_vec
|
||||
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
|
||||
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=2)
|
||||
|
||||
|
||||
@ -707,6 +734,7 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
|
||||
fmt_scalar, hk_data[current_idx : current_idx + inc_len_scalar]
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_scalar
|
||||
pw.dlog(f"Control Values Target Quaternion: {tgt_quat}")
|
||||
pw.dlog(f"Control Values Error Quaternion: {err_quat}")
|
||||
pw.dlog(f"Control Values Error Angle: {err_ang} [rad]")
|
||||
@ -745,6 +773,7 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
|
||||
fmt_vec3_int16, hk_data[current_idx : current_idx + inc_len_vec3_int16]
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_vec3_int16
|
||||
pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}")
|
||||
pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}")
|
||||
pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}")
|
||||
|
@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import logging
|
||||
import struct
|
||||
|
||||
@ -6,19 +7,28 @@ from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
|
||||
from tmtccmd.config.tmtc import tmtc_definitions_provider
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
make_sid,
|
||||
create_request_one_hk_command,
|
||||
create_enable_periodic_hk_command_with_interval,
|
||||
create_disable_periodic_hk_command,
|
||||
)
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OpCode:
|
||||
REQ_OS_HK = ["0", "hk-os"]
|
||||
RESET_GNSS = ["5", "reset"]
|
||||
REQ_OS_HK = ["hk"]
|
||||
ENABLE_HK = ["enable_hk"]
|
||||
DISABLE_HK = ["disable_hk"]
|
||||
RESET_GNSS = ["reset"]
|
||||
|
||||
|
||||
class Info:
|
||||
REQ_OS_HK = "Request One-Shot HK"
|
||||
ENABLE_HK = "Enable HK"
|
||||
DISABLE_HK = "Disable HK"
|
||||
RESET_GNSS = "Reset GNSS using reset pin"
|
||||
|
||||
|
||||
@ -31,6 +41,8 @@ def add_gps_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCode.RESET_GNSS, info=Info.RESET_GNSS)
|
||||
oce.add(keys=OpCode.REQ_OS_HK, info=Info.REQ_OS_HK)
|
||||
oce.add(keys=OpCode.ENABLE_HK, info=Info.ENABLE_HK)
|
||||
oce.add(keys=OpCode.DISABLE_HK, info=Info.DISABLE_HK)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.GPS_CTRL.value,
|
||||
info="GPS/GNSS Controller",
|
||||
@ -39,14 +51,26 @@ def add_gps_cmds(defs: TmtcDefinitionWrapper):
|
||||
|
||||
|
||||
def pack_gps_command(object_id: bytes, q: DefaultPusQueueHelper, op_code: str):
|
||||
sid = make_sid(object_id=object_id, set_id=SetId.HK)
|
||||
if op_code in OpCode.RESET_GNSS:
|
||||
# TODO: This needs to be re-implemented
|
||||
_LOGGER.warning("Reset pin handling needs to be re-implemented")
|
||||
if op_code in OpCode.REQ_OS_HK:
|
||||
q.add_log_cmd(f"GMSS: {Info.REQ_OS_HK}")
|
||||
q.add_pus_tc(
|
||||
generate_one_hk_command(sid=make_sid(object_id=object_id, set_id=SetId.HK))
|
||||
if op_code in OpCode.ENABLE_HK:
|
||||
interval = float(input("Please specify interval in floating point seconds: "))
|
||||
if interval <= 0:
|
||||
raise ValueError("invalid interval")
|
||||
q.add_log_cmd(f"GPS: {Info.ENABLE_HK}")
|
||||
cmds = create_enable_periodic_hk_command_with_interval(
|
||||
diag=False, sid=sid, interval_seconds=interval
|
||||
)
|
||||
for cmd in cmds:
|
||||
q.add_pus_tc(cmd)
|
||||
if op_code in OpCode.DISABLE_HK:
|
||||
q.add_log_cmd(f"gps: {Info.DISABLE_HK}")
|
||||
q.add_pus_tc(create_disable_periodic_hk_command(diag=False, sid=sid))
|
||||
if op_code in OpCode.REQ_OS_HK:
|
||||
q.add_log_cmd(f"GPS: {Info.REQ_OS_HK}")
|
||||
q.add_pus_tc(create_request_one_hk_command(sid=sid))
|
||||
|
||||
|
||||
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
@ -72,7 +96,12 @@ def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
unix_seconds,
|
||||
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
|
||||
current_idx += inc_len
|
||||
date_string = f"{day}.{month}.{year} {hours}:{minutes}:{seconds}"
|
||||
if year == 0:
|
||||
date_string = "No date string, year is 0"
|
||||
else:
|
||||
date_string = datetime.datetime(
|
||||
year=year, month=month, day=day, hour=hours, minute=minutes, second=seconds
|
||||
)
|
||||
pw.dlog(f"Lat: {lat} deg")
|
||||
pw.dlog(f"Long: {long} deg")
|
||||
pw.dlog(f"Altitude: {alt} m | Speed: {speed} m/s")
|
||||
|
@ -1,13 +1,25 @@
|
||||
import enum
|
||||
import logging
|
||||
import struct
|
||||
|
||||
import eive_tmtc.config.object_ids as obj_ids
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
|
||||
import eive_tmtc.config.object_ids as obj_ids
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_hk_command, make_sid
|
||||
from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry, TmtcDefinitionWrapper
|
||||
from eive_tmtc.config.object_ids import GYRO_0_ADIS_HANDLER_ID, GYRO_1_L3G_HANDLER_ID, GYRO_2_ADIS_HANDLER_ID, GYRO_3_L3G_HANDLER_ID
|
||||
from eive_tmtc.config.definitions import CustomServiceList
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
class OpCode:
|
||||
CORE_HK = "core_hk"
|
||||
CFG_HK = "cfg_hk"
|
||||
|
||||
|
||||
class AdisGyroSetId(enum.IntEnum):
|
||||
CORE_HK = 0
|
||||
CFG_HK = 1
|
||||
@ -17,6 +29,46 @@ class L3gGyroSetId(enum.IntEnum):
|
||||
CORE_HK = 0
|
||||
|
||||
|
||||
class GyrSel(enum.IntEnum):
|
||||
GYR_0_ADIS = 0
|
||||
GYR_1_L3G = 1
|
||||
GYR_2_ADIS = 2
|
||||
GYR_3_L3G = 3
|
||||
|
||||
|
||||
GYR_SEL_DICT = {
|
||||
GyrSel.GYR_0_ADIS: ("GYRO_0_ADIS", GYRO_0_ADIS_HANDLER_ID),
|
||||
GyrSel.GYR_1_L3G: ("GYRO_1_L3G", GYRO_1_L3G_HANDLER_ID),
|
||||
GyrSel.GYR_2_ADIS: ("GYRO_2_ADIS", GYRO_2_ADIS_HANDLER_ID),
|
||||
GyrSel.GYR_3_L3G: ("GYRO_3_L3G", GYRO_3_L3G_HANDLER_ID),
|
||||
}
|
||||
|
||||
|
||||
def handle_gyr_cmd(q: DefaultPusQueueHelper, op_code: str):
|
||||
print("Please select the Gyro Device")
|
||||
for (k, v) in GYR_SEL_DICT.items():
|
||||
print(f"{k}: {v[0]}")
|
||||
sel_idx = int(input("Select gyro device by index: "))
|
||||
gyr_info = GYR_SEL_DICT[GyrSel(sel_idx)]
|
||||
gyr_obj_id = gyr_info[1]
|
||||
is_adis = False
|
||||
if sel_idx == GyrSel.GYR_0_ADIS or sel_idx == GyrSel.GYR_2_ADIS:
|
||||
is_adis = True
|
||||
core_hk_id = AdisGyroSetId.CORE_HK
|
||||
else:
|
||||
core_hk_id = L3gGyroSetId.CORE_HK
|
||||
if op_code == OpCode.CORE_HK:
|
||||
q.add_log_cmd(f"Gyro {gyr_info[0]} Core HK")
|
||||
q.add_pus_tc(create_request_one_hk_command(make_sid(gyr_obj_id, core_hk_id)))
|
||||
elif op_code == OpCode.CFG_HK:
|
||||
if not is_adis:
|
||||
raise ValueError("No config HK for L3 device")
|
||||
q.add_log_cmd(f"Gyro {gyr_info[0]} CFG HK")
|
||||
q.add_pus_tc(create_request_one_hk_command(make_sid(gyr_obj_id, AdisGyroSetId.CFG_HK)))
|
||||
else:
|
||||
logging.getLogger(__name__).warning(f"invalid op code {op_code} for gyro command")
|
||||
|
||||
|
||||
def handle_gyros_hk_data(
|
||||
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
|
||||
):
|
||||
@ -43,24 +95,26 @@ def handle_adis_gyro_hk(
|
||||
pw = PrintWrapper(printer)
|
||||
fmt_str = "!ddddddf"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(angVelocX, angVelocY, angVelocZ, accelX, accelY, accelZ, temp) = struct.unpack(
|
||||
(ang_veloc_x, ang_veloc_y, ang_veloc_z, accel_x, accel_y, accel_z, temp) = struct.unpack(
|
||||
fmt_str, hk_data[0 : 0 + inc_len]
|
||||
)
|
||||
pw.dlog(f"Received ADIS1650X Gyro HK data from object {object_id}")
|
||||
pw.dlog(
|
||||
f"Angular Velocities (degrees per second): X {angVelocX} | "
|
||||
f"Y {angVelocY} | Z {angVelocZ}"
|
||||
f"Angular Velocities (degrees per second): X {ang_veloc_x} | "
|
||||
f"Y {ang_veloc_y} | Z {ang_veloc_z}"
|
||||
)
|
||||
pw.dlog(f"Acceleration (m/s^2): X {accelX} | Y {accelY} | Z {accelZ}")
|
||||
pw.dlog(f"Acceleration (m/s^2): X {accel_x} | Y {accel_y} | Z {accel_z}")
|
||||
pw.dlog(f"Temperature {temp} C")
|
||||
if set_id == AdisGyroSetId.CFG_HK:
|
||||
pw = PrintWrapper(printer)
|
||||
fmt_str = "!HBHH"
|
||||
fmt_str = "!HBHHH"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(diag_stat_reg, filter_setting, msc_ctrl_reg, dec_rate_reg) = struct.unpack(
|
||||
print(len(hk_data))
|
||||
(diag_stat_reg, filter_setting, range_mdl, msc_ctrl_reg, dec_rate_reg) = struct.unpack(
|
||||
fmt_str, hk_data[0 : 0 + inc_len]
|
||||
)
|
||||
pw.dlog(f"Diagnostic Status Register {diag_stat_reg:#018b}")
|
||||
pw.dlog(f"Range MDL {range_mdl}")
|
||||
pw.dlog(f"Filter Settings {filter_setting:#010b}")
|
||||
pw.dlog(f"Miscellaneous Control Register {msc_ctrl_reg:#018b}")
|
||||
pw.dlog(f"Decimation Rate {dec_rate_reg:#06x}")
|
||||
@ -82,3 +136,11 @@ def handle_l3g_gyro_hk(
|
||||
f"Y {angVelocY} | Z {angVelocZ}"
|
||||
)
|
||||
pw.dlog(f"Temperature {temp} °C")
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_gyr_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCode.CORE_HK, info="Request Core HK")
|
||||
oce.add(keys=OpCode.CFG_HK, info="Request CFG HK")
|
||||
defs.add_service(CustomServiceList.GYRO, info="Gyro", op_code_entry=oce)
|
||||
|
@ -5,6 +5,7 @@
|
||||
@author J. Meier
|
||||
@date 25.03.2021
|
||||
"""
|
||||
import logging
|
||||
import struct
|
||||
from typing import List
|
||||
|
||||
@ -27,23 +28,39 @@ from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OpCode:
|
||||
ON = ["on"]
|
||||
NORMAL = ["normal"]
|
||||
OFF = ["off"]
|
||||
SET_DIPOLE = ["set_dipole"]
|
||||
ON = "on"
|
||||
NORMAL = "normal"
|
||||
OFF = "off"
|
||||
SET_DIPOLE = "set_dipole"
|
||||
REQUEST_ENG_HK = "hk_os_eng_hk"
|
||||
REQUEST_MGM_RAW = "hk_os_mgm_raw"
|
||||
POS_X_SELF_TEST = "self_test_pos_x"
|
||||
NEG_X_SELF_TEST = "self_test_neg_x"
|
||||
POS_Y_SELF_TEST = "self_test_pos_y"
|
||||
NEG_Y_SELF_TEST = "self_test_neg_y"
|
||||
POS_Z_SELF_TEST = "self_test_pos_z"
|
||||
NEG_Z_SELF_TEST = "self_test_neg_z"
|
||||
|
||||
|
||||
class ImtqSetId:
|
||||
ENG_HK_SET = 1
|
||||
CAL_MTM_SET = 2
|
||||
RAW_MTM_SET = 3
|
||||
POSITIVE_X_TEST = 4
|
||||
NEGATIVE_X_TEST = 5
|
||||
POSITIVE_Y_TEST = 6
|
||||
NEGATIVE_Y_TEST = 7
|
||||
POSITIVE_Z_TEST = 8
|
||||
NEGATIVE_Z_TEST = 9
|
||||
ENG_HK_NO_TORQUE = 1
|
||||
RAW_MTM_NO_TORQUE = 2
|
||||
ENG_HK_SET_WITH_TORQUE = 3
|
||||
RAW_MTM_WITH_TORQUE = 4
|
||||
STATUS_SET = 5
|
||||
DIPOLES = 6
|
||||
|
||||
CAL_MTM_SET = 9
|
||||
POSITIVE_X_TEST = 10
|
||||
NEGATIVE_X_TEST = 11
|
||||
POSITIVE_Y_TEST = 12
|
||||
NEGATIVE_Y_TEST = 13
|
||||
POSITIVE_Z_TEST = 14
|
||||
NEGATIVE_Z_TEST = 15
|
||||
|
||||
|
||||
class ImtqActionId:
|
||||
@ -66,17 +83,17 @@ def add_imtq_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce.add(OpCode.OFF, "Mode Off")
|
||||
oce.add(OpCode.ON, "Mode On")
|
||||
oce.add(OpCode.NORMAL, "Mode Normal")
|
||||
oce.add("3", "IMTQ perform pos X self test")
|
||||
oce.add("4", "IMTQ perform neg X self test")
|
||||
oce.add("5", "IMTQ perform pos Y self test")
|
||||
oce.add("6", "IMTQ perform neg Y self test")
|
||||
oce.add("7", "IMTQ perform pos Z self test")
|
||||
oce.add("8", "IMTQ perform neg Z self test")
|
||||
oce.add(OpCode.REQUEST_ENG_HK, "Request Engineering HK One Shot")
|
||||
oce.add(OpCode.REQUEST_MGM_RAW, "Request MGM Raw HK One Shot")
|
||||
oce.add(OpCode.POS_X_SELF_TEST, "IMTQ perform pos X self test")
|
||||
oce.add(OpCode.NEG_X_SELF_TEST, "IMTQ perform neg X self test")
|
||||
oce.add(OpCode.POS_Y_SELF_TEST, "IMTQ perform pos Y self test")
|
||||
oce.add(OpCode.NEG_Y_SELF_TEST, "IMTQ perform neg Y self test")
|
||||
oce.add(OpCode.POS_Z_SELF_TEST, "IMTQ perform pos Z self test")
|
||||
oce.add(OpCode.NEG_Z_SELF_TEST, "IMTQ perform neg Z self test")
|
||||
oce.add(OpCode.SET_DIPOLE, "IMTQ command dipole")
|
||||
oce.add("10", "IMTQ get commanded dipole")
|
||||
oce.add("11", "IMTQ get engineering hk set")
|
||||
oce.add("12", "IMTQ get calibrated MTM measurement one shot")
|
||||
oce.add("13", "IMTQ get raw MTM measurement one shot")
|
||||
defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce)
|
||||
|
||||
|
||||
@ -85,19 +102,19 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}"
|
||||
)
|
||||
|
||||
if op_code in OpCode.OFF:
|
||||
if op_code == OpCode.OFF:
|
||||
q.add_log_cmd("IMTQ: Set mode off")
|
||||
command = pack_mode_data(object_id.as_bytes, Mode.OFF, 0)
|
||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
|
||||
if op_code in OpCode.ON:
|
||||
if op_code == OpCode.ON:
|
||||
q.add_log_cmd("IMTQ: Set mode on")
|
||||
command = pack_mode_data(object_id.as_bytes, Mode.ON, 0)
|
||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
|
||||
if op_code in OpCode.NORMAL:
|
||||
if op_code == OpCode.NORMAL:
|
||||
q.add_log_cmd("IMTQ: Mode Normal")
|
||||
command = pack_mode_data(object_id.as_bytes, Mode.NORMAL, 0)
|
||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command))
|
||||
if op_code == "3":
|
||||
if op_code == OpCode.POS_X_SELF_TEST:
|
||||
q.add_log_cmd("IMTQ: Perform positive x self test")
|
||||
command = object_id.as_bytes + ImtqActionId.perform_positive_x_test
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
@ -110,7 +127,7 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
sid = make_sid(object_id.as_bytes, ImtqSetId.POSITIVE_X_TEST)
|
||||
q.add_pus_tc(generate_one_hk_command(sid))
|
||||
|
||||
if op_code == "4":
|
||||
if op_code == OpCode.NEG_X_SELF_TEST:
|
||||
q.add_log_cmd("IMTQ: Perform negative x self test")
|
||||
command = object_id.as_bytes + ImtqActionId.perform_negative_x_test
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
@ -121,7 +138,7 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
sid = make_sid(object_id.as_bytes, ImtqSetId.NEGATIVE_X_TEST)
|
||||
q.add_pus_tc(generate_one_hk_command(sid))
|
||||
|
||||
if op_code == "5":
|
||||
if op_code == OpCode.POS_Y_SELF_TEST:
|
||||
q.add_log_cmd("IMTQ: Perform positive y self test")
|
||||
command = object_id.as_bytes + ImtqActionId.perform_positive_y_test
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
@ -133,7 +150,7 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
sid = make_sid(object_id.as_bytes, ImtqSetId.POSITIVE_Y_TEST)
|
||||
q.add_pus_tc(generate_one_hk_command(sid))
|
||||
|
||||
if op_code == "6":
|
||||
if op_code == OpCode.NEG_Y_SELF_TEST:
|
||||
q.add_log_cmd("IMTQ: Perform negative y self test")
|
||||
command = object_id.as_bytes + ImtqActionId.perform_negative_y_test
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
@ -146,7 +163,7 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
sid = make_sid(object_id.as_bytes, ImtqSetId.NEGATIVE_Y_TEST)
|
||||
q.add_pus_tc(generate_one_hk_command(sid))
|
||||
|
||||
if op_code == "7":
|
||||
if op_code == OpCode.POS_Z_SELF_TEST:
|
||||
q.add_log_cmd("IMTQ: Perform positive z self test")
|
||||
command = object_id.as_bytes + ImtqActionId.perform_positive_z_test
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
@ -159,7 +176,7 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
sid = make_sid(object_id.as_bytes, ImtqSetId.POSITIVE_Y_TEST)
|
||||
q.add_pus_tc(generate_one_hk_command(sid))
|
||||
|
||||
if op_code == "8":
|
||||
if op_code == OpCode.NEG_Z_SELF_TEST:
|
||||
q.add_log_cmd("IMTQ: Perform negative z self test")
|
||||
command = object_id.as_bytes + ImtqActionId.perform_negative_z_test
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
@ -195,11 +212,13 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
command = object_id.as_bytes + ImtqActionId.get_commanded_dipole
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
|
||||
if op_code == "11":
|
||||
if op_code == OpCode.REQUEST_ENG_HK:
|
||||
q.add_log_cmd("IMTQ: Get engineering hk set")
|
||||
q.add_pus_tc(
|
||||
generate_one_diag_command(
|
||||
sid=make_sid(object_id=object_id.as_bytes, set_id=ImtqSetId.ENG_HK_SET)
|
||||
sid=make_sid(
|
||||
object_id=object_id.as_bytes, set_id=ImtqSetId.ENG_HK_NO_TORQUE
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -211,11 +230,13 @@ def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_cod
|
||||
)
|
||||
)
|
||||
|
||||
if op_code == "13":
|
||||
if op_code == OpCode.REQUEST_MGM_RAW:
|
||||
q.add_log_cmd("IMTQ: Get raw MTM hk set")
|
||||
q.add_pus_tc(
|
||||
generate_one_diag_command(
|
||||
sid=make_sid(object_id=object_id.as_bytes, set_id=ImtqSetId.RAW_MTM_SET)
|
||||
sid=make_sid(
|
||||
object_id=object_id.as_bytes, set_id=ImtqSetId.RAW_MTM_NO_TORQUE
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -262,6 +283,14 @@ def raise_dipole_error(dipole_str: str, value: int):
|
||||
)
|
||||
|
||||
|
||||
STATUS_HEADERS = [
|
||||
"Status Byte Mode",
|
||||
"Status Byte Error",
|
||||
"Status Byte Config",
|
||||
"Status Byte Uptime [s]",
|
||||
]
|
||||
|
||||
|
||||
ENG_HK_HEADERS = [
|
||||
"Digital Voltage [mV]",
|
||||
"Analog Voltage [mV]",
|
||||
@ -277,6 +306,39 @@ ENG_HK_HEADERS = [
|
||||
]
|
||||
|
||||
|
||||
def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
|
||||
if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST):
|
||||
return handle_self_test_data(printer, hk_data)
|
||||
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
|
||||
_LOGGER.info("Found engineering HK without torque")
|
||||
return handle_eng_set(printer, hk_data)
|
||||
elif set_id == ImtqSetId.ENG_HK_SET_WITH_TORQUE:
|
||||
_LOGGER.info("Found engineering HK during torque")
|
||||
return handle_eng_set(printer, hk_data)
|
||||
elif set_id == ImtqSetId.CAL_MTM_SET:
|
||||
return handle_calibrated_mtm_measurement(printer, hk_data)
|
||||
elif set_id == ImtqSetId.RAW_MTM_NO_TORQUE:
|
||||
_LOGGER.info("Found raw MTM measurement without torque")
|
||||
return handle_raw_mtm_measurement(printer, hk_data)
|
||||
elif set_id == ImtqSetId.RAW_MTM_WITH_TORQUE:
|
||||
_LOGGER.info("Found raw MTM measurement during torque")
|
||||
return handle_raw_mtm_measurement(printer, hk_data)
|
||||
elif set_id == ImtqSetId.STATUS_SET:
|
||||
return handle_status_set(printer, hk_data)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
f"IMTQ handler HK reply with unknown or unimplemented set id {set_id}"
|
||||
)
|
||||
|
||||
|
||||
def unpack_status_set(hk_data: bytes) -> List:
|
||||
status_mode = hk_data[0]
|
||||
status_error = hk_data[1]
|
||||
status_conf = hk_data[2]
|
||||
status_uptime = struct.unpack("!I", hk_data[3:7])[0]
|
||||
return [status_mode, status_error, status_conf, status_uptime]
|
||||
|
||||
|
||||
def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
digital_voltage = struct.unpack("!H", hk_data[0:2])[0]
|
||||
analog_voltage = struct.unpack("!H", hk_data[2:4])[0]
|
||||
@ -311,8 +373,19 @@ def handle_eng_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
validity_buffer = hk_data[32:]
|
||||
|
||||
num_of_vars = len(ENG_HK_HEADERS)
|
||||
pw.dlog(str(ENG_HK_HEADERS))
|
||||
pw.dlog(str(content_list))
|
||||
for k, v in zip(ENG_HK_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
|
||||
|
||||
def handle_status_set(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
content_list = unpack_status_set(hk_data)
|
||||
validity_buffer = hk_data[7:]
|
||||
|
||||
num_of_vars = 4
|
||||
for k, v in zip(STATUS_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
|
||||
|
||||
|
@ -57,7 +57,7 @@ def handle_mgm_rm3100_hk_data(
|
||||
fmt_str = f"!fff"
|
||||
inc_len = struct.calcsize(fmt_str)
|
||||
(field_x, field_y, field_z) = struct.unpack(fmt_str, hk_data[0 : 0 + inc_len])
|
||||
pw.dlog(f"Received MGM LIS3 from object {object_id}")
|
||||
pw.dlog(f"Received MGM RM3100 from object {object_id}")
|
||||
pw.dlog(
|
||||
f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}"
|
||||
)
|
||||
|
@ -4,6 +4,7 @@
|
||||
@author J. Meier
|
||||
@date 20.06.2021
|
||||
"""
|
||||
import enum
|
||||
import struct
|
||||
from typing import List
|
||||
|
||||
@ -64,7 +65,7 @@ class InfoAss:
|
||||
ALL_SPEED_OFF = "Speed down to 0"
|
||||
|
||||
|
||||
class RwSetId:
|
||||
class RwSetId(enum.IntEnum):
|
||||
STATUS_SET_ID = 4
|
||||
TEMPERATURE_SET_ID = 8
|
||||
LAST_RESET = 2
|
||||
|
@ -20,16 +20,22 @@ class OpCode(str, enum.Enum):
|
||||
SAFE = "safe"
|
||||
DETUMBLE = "detumble"
|
||||
IDLE = "idle"
|
||||
TARGET_PT = "target"
|
||||
PTG_TARGET = "ptg_target"
|
||||
PTG_TARGET_NADIR = "ptg_nadir"
|
||||
PTG_TARGET_GS = "ptg_target_gs"
|
||||
PTG_TARGET_INERTIAL = "ptg_inertial"
|
||||
REPORT_ALL_MODES = "all_modes"
|
||||
|
||||
|
||||
class AcsMode(enum.IntEnum):
|
||||
OFF = 0
|
||||
SAFE = 1 << 24
|
||||
DETUMBLE = 2 << 24
|
||||
IDLE = 3 << 24
|
||||
TARGET_PT = 4 << 24
|
||||
SAFE = 10
|
||||
DETUMBLE = 11
|
||||
IDLE = 12
|
||||
PTG_TARGET_NADIR = 13
|
||||
PTG_TARGET = 14
|
||||
PTG_TARGET_GS = 15
|
||||
PTG_TARGET_INERTIAL = 16
|
||||
|
||||
|
||||
class Info(str, enum.Enum):
|
||||
@ -37,15 +43,22 @@ class Info(str, enum.Enum):
|
||||
SAFE = "Safe Mode Command"
|
||||
DETUMBLE = "Detumble Mode Command"
|
||||
IDLE = "Idle Mode Command"
|
||||
TARGET_PT = "Target Pointing Mode Command"
|
||||
PTG_TARGET_NADIR = "Target Pointing Nadir"
|
||||
PTG_TARGET = "Target Pointing"
|
||||
PTG_TARGET_GS = "Target Pointing Ground Station"
|
||||
PTG_TARGET_INERTIAL = "Target Pointing Inertial"
|
||||
REPORT_ALL_MODES = "Report All Modes Recursively"
|
||||
|
||||
|
||||
HANDLER_LIST: Dict[str, Tuple[int, str]] = {
|
||||
OpCode.OFF: (AcsMode.OFF, Info.OFF),
|
||||
OpCode.IDLE: (AcsMode.IDLE, Info.IDLE),
|
||||
OpCode.SAFE: (AcsMode.SAFE, Info.SAFE),
|
||||
OpCode.DETUMBLE: (AcsMode.DETUMBLE, Info.DETUMBLE),
|
||||
OpCode.IDLE: (AcsMode.IDLE, Info.IDLE),
|
||||
OpCode.PTG_TARGET: (AcsMode.PTG_TARGET, Info.PTG_TARGET),
|
||||
OpCode.PTG_TARGET_GS: (AcsMode.PTG_TARGET_GS, Info.PTG_TARGET_GS),
|
||||
OpCode.PTG_TARGET_NADIR: (AcsMode.PTG_TARGET_NADIR, Info.PTG_TARGET_NADIR),
|
||||
OpCode.PTG_TARGET_INERTIAL: (AcsMode.PTG_TARGET_INERTIAL, Info.PTG_TARGET_INERTIAL),
|
||||
}
|
||||
|
||||
|
||||
@ -78,8 +91,7 @@ def build_acs_subsystem_cmd(p: ServiceProviderParams):
|
||||
@tmtc_definitions_provider
|
||||
def add_acs_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(OpCode.OFF, Info.OFF)
|
||||
oce.add(OpCode.SAFE, Info.SAFE)
|
||||
oce.add(OpCode.IDLE, Info.IDLE)
|
||||
for op_code, (_, info) in HANDLER_LIST.items():
|
||||
oce.add(op_code, info)
|
||||
oce.add(OpCode.REPORT_ALL_MODES, Info.REPORT_ALL_MODES)
|
||||
defs.add_service(CustomServiceList.ACS_SS, "ACS Subsystem", oce)
|
||||
|
@ -1,3 +1,4 @@
|
||||
import enum
|
||||
import struct
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
@ -5,7 +6,7 @@ from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
class SetId:
|
||||
class SetId(enum.IntEnum):
|
||||
HK = 3
|
||||
|
||||
|
||||
|
@ -10,7 +10,12 @@ from tmtccmd.config.tmtc import (
|
||||
)
|
||||
from tmtccmd.tc import service_provider
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import (
|
||||
create_mode_command,
|
||||
create_read_mode_command,
|
||||
create_announce_mode_command,
|
||||
create_announce_mode_recursive_command,
|
||||
)
|
||||
from tmtccmd.tc.pus_20_fsfw_param import (
|
||||
create_load_param_cmd,
|
||||
pack_scalar_u8_parameter_app_data,
|
||||
@ -23,11 +28,11 @@ class ParameterId(enum.IntEnum):
|
||||
|
||||
|
||||
class Submode(enum.IntEnum):
|
||||
RX_ONLY = 0
|
||||
RX_AND_TX_DEF_DATARATE = 1
|
||||
RX_AND_TX_LOW_DATARATE = 2
|
||||
RX_AND_TX_HIGH_DATARATE = 3
|
||||
RX_AND_TX_CARRIER_WAVE = 4
|
||||
RX_ONLY = 10
|
||||
RX_AND_TX_DEF_DATARATE = 11
|
||||
RX_AND_TX_LOW_DATARATE = 12
|
||||
RX_AND_TX_HIGH_DATARATE = 13
|
||||
RX_AND_TX_CARRIER_WAVE = 14
|
||||
|
||||
|
||||
class OpCode:
|
||||
@ -39,6 +44,9 @@ class OpCode:
|
||||
UPDATE_DEFAULT_DATARATE_LOW = "update_default_rate_low"
|
||||
UPDATE_DEFAULT_DATARATE_HIGH = "update_default_rate_high"
|
||||
CHANGE_TRANSMITTER_TIMEOUT = "change_transmitter_timeout"
|
||||
READ_MODE = "read_mode"
|
||||
ANNOUNCE_MODE = "announce_mode"
|
||||
ANNOUNCE_MODE_RECURSIVE = "announce_mode_recursive"
|
||||
|
||||
|
||||
class Info:
|
||||
@ -50,6 +58,9 @@ class Info:
|
||||
UPDATE_DEFAULT_DATARATE_LOW = "Configure default low datarate (BPSK modulation)"
|
||||
UPDATE_DEFAULT_DATARATE_HIGH = "Configure default high datarate (0QPSK modulation)"
|
||||
CHANGE_TRANSMITTER_TIMEOUT = "Changes the transmitter timeout"
|
||||
READ_MODE = "Read Mode"
|
||||
ANNOUNCE_MODE = "Announce Mode"
|
||||
ANNOUNCE_MODE_RECURSIVE = "Announce mode recursively"
|
||||
|
||||
|
||||
@service_provider(CustomServiceList.COM_SS)
|
||||
@ -117,6 +128,15 @@ def build_com_subsystem_cmd(p: ServiceProviderParams):
|
||||
)
|
||||
)
|
||||
)
|
||||
elif o == OpCode.READ_MODE:
|
||||
q.add_log_cmd(Info.READ_MODE)
|
||||
q.add_pus_tc(create_read_mode_command(COM_SUBSYSTEM_ID))
|
||||
elif o == OpCode.ANNOUNCE_MODE:
|
||||
q.add_log_cmd(Info.ANNOUNCE_MODE)
|
||||
q.add_pus_tc(create_announce_mode_command(COM_SUBSYSTEM_ID))
|
||||
elif o == OpCode.ANNOUNCE_MODE_RECURSIVE:
|
||||
q.add_log_cmd(Info.ANNOUNCE_MODE_RECURSIVE)
|
||||
q.add_pus_tc(create_announce_mode_recursive_command(COM_SUBSYSTEM_ID))
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
@ -128,4 +148,8 @@ def add_com_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce.add(OpCode.TX_AND_RX_DEF_RATE, Info.TX_AND_RX_DEF_DATARATE)
|
||||
oce.add(OpCode.UPDATE_DEFAULT_DATARATE_LOW, Info.UPDATE_DEFAULT_DATARATE_LOW)
|
||||
oce.add(OpCode.UPDATE_DEFAULT_DATARATE_HIGH, Info.UPDATE_DEFAULT_DATARATE_HIGH)
|
||||
oce.add(OpCode.CHANGE_TRANSMITTER_TIMEOUT, Info.CHANGE_TRANSMITTER_TIMEOUT)
|
||||
oce.add(OpCode.READ_MODE, Info.READ_MODE)
|
||||
oce.add(OpCode.ANNOUNCE_MODE, Info.ANNOUNCE_MODE)
|
||||
oce.add(OpCode.ANNOUNCE_MODE_RECURSIVE, Info.ANNOUNCE_MODE_RECURSIVE)
|
||||
defs.add_service(CustomServiceList.COM_SS, "COM Subsystem", oce)
|
||||
|
@ -6,6 +6,8 @@
|
||||
@date 13.12.2020
|
||||
"""
|
||||
import enum
|
||||
import logging
|
||||
import math
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
|
||||
@ -21,6 +23,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
create_request_one_diag_command,
|
||||
create_enable_periodic_hk_command_with_interval,
|
||||
create_disable_periodic_hk_command,
|
||||
create_request_one_hk_command,
|
||||
)
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode, create_mode_command
|
||||
@ -31,7 +34,7 @@ from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
class SetId:
|
||||
class SetId(enum.IntEnum):
|
||||
RX_REGISTERS_DATASET = 1
|
||||
TX_REGISTERS_DATASET = 2
|
||||
TEMPERATURE_SET_ID = 3
|
||||
@ -46,6 +49,7 @@ class OpCode:
|
||||
NORMAL_RX_AND_TX_LOW_DATARATE = "nml_low_datarate"
|
||||
NORMAL_RX_AND_TX_HIGH_DATARATE = "nml_high_datarate"
|
||||
HK_RX_REGS = "hk_rx_regs"
|
||||
HK_TEMPS = "hk_temps"
|
||||
ENABLE_HK_RX_REGS = "enable_hk_rx"
|
||||
DISABLE_HK_RX_REGS = "disable_hk_rx"
|
||||
ENABLE_HK_TX_REGS = "enable_hk_tx"
|
||||
@ -65,6 +69,7 @@ class Info:
|
||||
NORMAL_RX_AND_TX_HIGH_DATARATE = "NORMAL RX and TX, TX with high datarate"
|
||||
HK_RX_REGS = "Request RX register set"
|
||||
HK_TX_REGS = "Request TX register set"
|
||||
HK_TEMPS = "Request Temperatures HK"
|
||||
ENABLE_HK_RX_REGS = "Enable periodic RX register HK"
|
||||
DISABLE_HK_RX_REGS = "Disable periodic RX register HK"
|
||||
ENABLE_HK_TX_REGS = "Enable periodic TX register HK"
|
||||
@ -123,6 +128,7 @@ def add_syrlinks_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce.add(OpCode.DISABLE_HK_RX_REGS, Info.DISABLE_HK_RX_REGS)
|
||||
oce.add(OpCode.ENABLE_HK_TX_REGS, Info.ENABLE_HK_TX_REGS)
|
||||
oce.add(OpCode.DISABLE_HK_TX_REGS, Info.DISABLE_HK_TX_REGS)
|
||||
oce.add(OpCode.HK_TEMPS, Info.HK_TEMPS)
|
||||
oce.add("7", "Syrlinks Handler: Read TX waveform")
|
||||
oce.add("8", "Syrlinks Handler: Read TX AGC value high byte")
|
||||
oce.add("9", "Syrlinks Handler: Read TX AGC value low byte")
|
||||
@ -176,6 +182,10 @@ def pack_syrlinks_command(
|
||||
q.add_log_cmd(f"{prefix}: {Info.HK_RX_REGS}")
|
||||
sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET)
|
||||
q.add_pus_tc(create_request_one_diag_command(sid))
|
||||
if op_code in OpCode.HK_TEMPS:
|
||||
q.add_log_cmd(f"{prefix}: {Info.HK_TEMPS}")
|
||||
sid = make_sid(obyt, SetId.TEMPERATURE_SET_ID)
|
||||
q.add_pus_tc(create_request_one_hk_command(sid))
|
||||
if op_code in OpCode.ENABLE_HK_RX_REGS:
|
||||
q.add_log_cmd(f"{prefix}: {Info.ENABLE_HK_RX_REGS}")
|
||||
sid = make_sid(obyt, SetId.RX_REGISTERS_DATASET)
|
||||
@ -257,11 +267,24 @@ def handle_syrlinks_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: byte
|
||||
return handle_syrlinks_rx_registers_dataset(printer, hk_data)
|
||||
elif set_id == SetId.TX_REGISTERS_DATASET:
|
||||
return handle_syrlinks_tx_registers_dataset(printer, hk_data)
|
||||
elif set_id == SetId.TEMPERATURE_SET_ID:
|
||||
return handle_syrlinks_temp_dataset(printer, hk_data)
|
||||
else:
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog(f"Service 3 TM: Syrlinks handler reply with unknown set ID {set_id}")
|
||||
|
||||
|
||||
def handle_syrlinks_temp_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
if len(hk_data) < 8:
|
||||
raise ValueError("expected at least 8 bytes of HK data")
|
||||
temp_power_amplifier = struct.unpack("!f", hk_data[0:4])[0]
|
||||
temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0]
|
||||
pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}")
|
||||
pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}")
|
||||
printer.print_validity_buffer(hk_data[8:], 2)
|
||||
|
||||
|
||||
def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
header_list = [
|
||||
@ -269,21 +292,41 @@ def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: byte
|
||||
"RX Sensitivity",
|
||||
"RX Frequency Shift",
|
||||
"RX IQ Power",
|
||||
"RX AGC Value",
|
||||
"RX AGC Value (Raw)",
|
||||
"RX Demod Eb",
|
||||
"RX Demod N0",
|
||||
"RX Datarate",
|
||||
"RX Datarate [kbps]",
|
||||
]
|
||||
rx_status = hk_data[0]
|
||||
carrier_detect = rx_status & 0b1
|
||||
carrier_lock = (rx_status >> 1) & 0b1
|
||||
data_lock = (rx_status >> 2) & 0b1
|
||||
data_valid = (rx_status >> 3) & 0b1
|
||||
rx_sensitivity = struct.unpack("!I", hk_data[1:5])[0]
|
||||
rx_frequency_shift = struct.unpack("!i", hk_data[5:9])[0]
|
||||
freq_shift_hz = rx_frequency_shift / 8.0
|
||||
freq_shift_printout = f"Raw: {rx_frequency_shift}, Eng: {freq_shift_hz} Hz"
|
||||
rx_iq_power = struct.unpack("!H", hk_data[9:11])[0]
|
||||
rx_agc_value = struct.unpack("!H", hk_data[11:13])[0]
|
||||
rx_demod_eb = struct.unpack("!I", hk_data[13:17])[0]
|
||||
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])[0]
|
||||
rx_data_rate = hk_data[21]
|
||||
rx_agc_inhibit = (rx_agc_value >> 15) & 0b1
|
||||
rx_agc = rx_agc_value & 0xFFF
|
||||
rx_demod_eb = struct.unpack("!I", hk_data[13:17])[0] & 0xFFFFFF
|
||||
rx_demod_n0 = struct.unpack("!I", hk_data[17:21])[0] & 0xFFFFFF
|
||||
eb_to_n0 = 20 * math.log10(rx_demod_eb / rx_demod_n0) - 3
|
||||
rx_data_rate_raw = hk_data[21]
|
||||
rx_data_rate = -1
|
||||
if rx_data_rate_raw == 0:
|
||||
rx_data_rate = 256
|
||||
elif rx_data_rate_raw == 1:
|
||||
rx_data_rate = 128
|
||||
elif rx_data_rate_raw == 3:
|
||||
rx_data_rate = 64
|
||||
elif rx_data_rate_raw == 7:
|
||||
rx_data_rate = 32
|
||||
elif rx_data_rate_raw == 15:
|
||||
rx_data_rate = 16
|
||||
elif rx_data_rate_raw == 31:
|
||||
rx_data_rate = 8
|
||||
content_list = [
|
||||
rx_status,
|
||||
rx_sensitivity,
|
||||
@ -298,6 +341,36 @@ def handle_syrlinks_rx_registers_dataset(printer: FsfwTmTcPrinter, hk_data: byte
|
||||
for header, content in zip(header_list, content_list):
|
||||
pw.dlog(f"{header}: {content}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
|
||||
pw.dlog(f"Carrier Detect: {carrier_detect}")
|
||||
pw.dlog(f"Carrier Lock: {carrier_lock}")
|
||||
pw.dlog(f"Data Lock (data clock recovery loop lock status): {data_lock}")
|
||||
pw.dlog(f"Data Valid (valid if TEB < 10e-5): {data_valid}")
|
||||
pw.dlog(f"Data Lock (data clock recovery loop lock status): {data_lock}")
|
||||
pw.dlog(f"RX AGC Inhibit: {rx_agc_inhibit}")
|
||||
pw.dlog(f"RX AGC: {rx_agc}")
|
||||
pw.dlog(f"Eb / E0RX [dB]: {eb_to_n0}")
|
||||
|
||||
|
||||
class TxConv(enum.IntEnum):
|
||||
NO_CODING = 0b000
|
||||
VITERBI_HALF_G1G2INV = 0b010
|
||||
VITERBI_HALF = 0b111
|
||||
|
||||
|
||||
class TxStatus(enum.IntEnum):
|
||||
NOT_AVAILABLE = 0b00
|
||||
MODULATION = 0b01
|
||||
CW = 0b10
|
||||
STANDBY = 0b11
|
||||
|
||||
|
||||
class TxCfgSet(enum.IntEnum):
|
||||
START_WITH_CURRENT_CFG = 0b00
|
||||
START_WITH_CONF_0 = 0b01
|
||||
START_WITH_CONF_1 = 0b10
|
||||
|
||||
|
||||
WAVEFORM_STRINGS = ["OFF", "CW", "QPSK", "0QPSK", "PCM/PM", "PSK/PM", "BPSK"]
|
||||
|
||||
|
||||
def handle_syrlinks_tx_registers_dataset(
|
||||
@ -305,12 +378,49 @@ def handle_syrlinks_tx_registers_dataset(
|
||||
hk_data: bytes,
|
||||
):
|
||||
pw = PrintWrapper(printer)
|
||||
header_list = ["TX Status", "TX Waveform", "TX AGC value"]
|
||||
header_list = ["TX Status Raw", "TX Waveform", "TX AGC value"]
|
||||
tx_status = hk_data[0]
|
||||
"""
|
||||
try:
|
||||
tx_conv = TxConv(tx_status & 0b111)
|
||||
except ValueError:
|
||||
logging.getLogger(__name__).warning(
|
||||
f"invalid TX conv value {tx_status & 0b111}"
|
||||
)
|
||||
tx_conv = -1
|
||||
tx_diff_encoder_enable = (tx_status >> 3) & 0b1
|
||||
"""
|
||||
tx_status_status = TxStatus(tx_status & 0b11)
|
||||
try:
|
||||
tx_conf_set = TxCfgSet((tx_status >> 2) & 0b11)
|
||||
except ValueError:
|
||||
logging.getLogger(__name__).warning(
|
||||
f"invalid TX conf set {(tx_status >> 2) & 0b11}"
|
||||
)
|
||||
tx_conf_set = -1
|
||||
tx_clock_detect = (tx_status >> 4) & 0b1
|
||||
tx_waveform = hk_data[1]
|
||||
waveform = tx_waveform & 0b1111
|
||||
try:
|
||||
waveform_str = WAVEFORM_STRINGS[waveform]
|
||||
except IndexError:
|
||||
logging.getLogger(__name__).warning(f"Unknown waveform value {waveform}")
|
||||
waveform_str = "Unknown"
|
||||
pcm_mode = (tx_waveform >> 4) & 0b1
|
||||
tx_agc_value = struct.unpack("!H", hk_data[2:4])[0]
|
||||
tx_agc_inhibit = (tx_agc_value >> 15) & 0b1
|
||||
tx_agc = tx_agc_value & 0xFFF
|
||||
content_list = [tx_status, tx_waveform, tx_agc_value]
|
||||
validity_buffer = hk_data[4:]
|
||||
for header, content in zip(header_list, content_list):
|
||||
pw.dlog(f"{header}: {content}")
|
||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
|
||||
# pw.dlog(f"TX CONV: {tx_conv!r}")
|
||||
# pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}")
|
||||
pw.dlog(f"TX Status: {tx_status_status!r}")
|
||||
pw.dlog(f"TX Config Set: {tx_conf_set!r}")
|
||||
pw.dlog(f"TX Clock Detect: {tx_clock_detect}")
|
||||
pw.dlog(f"Waveform: {waveform_str}")
|
||||
pw.dlog(f"PCM Mode: {pcm_mode}")
|
||||
pw.dlog(f"TX AGC Inhibit: {tx_agc_inhibit}")
|
||||
pw.dlog(f"TX AGC: {tx_agc}")
|
||||
|
@ -20,6 +20,8 @@ _LOGGER = logging.getLogger(__name__)
|
||||
|
||||
class ActionId(enum.IntEnum):
|
||||
LIST_DIR_INTO_FILE = 0
|
||||
ANNOUNCE_VERSION = 1
|
||||
ANNOUNCE_CURRENT_IMAGE = 2
|
||||
SWITCH_REBOOT_FILE_HANDLING = 5
|
||||
RESET_REBOOT_COUNTER = 6
|
||||
SWITCH_IMG_LOCK = 7
|
||||
@ -39,6 +41,8 @@ class SetId(enum.IntEnum):
|
||||
|
||||
|
||||
class OpCode:
|
||||
ANNOUNCE_VERSION = "announce_version"
|
||||
ANNOUNCE_CURRENT_IMAGE = "announce_current_image"
|
||||
REBOOT_XSC = ["0", "reboot_xsc"]
|
||||
XSC_REBOOT_SELF = ["1", "reboot_self"]
|
||||
XSC_REBOOT_0_0 = ["2", "reboot_00"]
|
||||
@ -64,6 +68,8 @@ class OpCode:
|
||||
|
||||
|
||||
class Info:
|
||||
ANNOUNCE_VERSION = "Announce version"
|
||||
ANNOUNCE_CURRENT_IMAGE = "Announce current image"
|
||||
REBOOT_XSC = "XSC reboot with prompt"
|
||||
REBOOT_FULL = "Full regular reboot"
|
||||
OBSW_UPDATE_FROM_SD_0 = "Update OBSW from SD Card 0"
|
||||
@ -89,6 +95,8 @@ class Copy(enum.IntEnum):
|
||||
@tmtc_definitions_provider
|
||||
def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCode.ANNOUNCE_VERSION, info=Info.ANNOUNCE_VERSION)
|
||||
oce.add(keys=OpCode.ANNOUNCE_CURRENT_IMAGE, info=Info.ANNOUNCE_CURRENT_IMAGE)
|
||||
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
|
||||
oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
|
||||
oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL)
|
||||
@ -142,6 +150,14 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
||||
|
||||
|
||||
def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
if op_code == OpCode.ANNOUNCE_VERSION:
|
||||
q.add_log_cmd(f"{Info.ANNOUNCE_VERSION}")
|
||||
q.add_pus_tc(create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_VERSION))
|
||||
if op_code == OpCode.ANNOUNCE_CURRENT_IMAGE:
|
||||
q.add_log_cmd(f"{Info.ANNOUNCE_CURRENT_IMAGE}")
|
||||
q.add_pus_tc(
|
||||
create_action_cmd(CORE_CONTROLLER_ID, ActionId.ANNOUNCE_CURRENT_IMAGE)
|
||||
)
|
||||
if op_code in OpCode.REBOOT_XSC:
|
||||
reboot_self, chip_select, copy_select = determine_reboot_params()
|
||||
add_xsc_reboot_cmd(
|
||||
|
@ -130,7 +130,7 @@ class SupvActionId:
|
||||
MEM_CHECK = 61
|
||||
|
||||
|
||||
class SetIds:
|
||||
class SetIds(enum.IntEnum):
|
||||
HK_REPORT = 102
|
||||
BOOT_STATUS_REPORT = 103
|
||||
|
||||
|
@ -12,22 +12,27 @@ from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservice
|
||||
|
||||
|
||||
class ModeId:
|
||||
OFF = 0
|
||||
SUPV_ONLY = 10
|
||||
MPSOC_STREAM = 11
|
||||
CAM_STREAM = 12
|
||||
EARTH_OBSV = 13
|
||||
SCEX = 14
|
||||
|
||||
|
||||
class OpCode(str, enum.Enum):
|
||||
OFF = "off"
|
||||
REPORT_ALL_MODES = "report_modes"
|
||||
|
||||
|
||||
class PayloadMode(enum.IntEnum):
|
||||
OFF = 0
|
||||
|
||||
|
||||
class Info(str, enum.Enum):
|
||||
OFF = "Off Command"
|
||||
REPORT_ALL_MODES = "Report all modes"
|
||||
|
||||
|
||||
HANDLER_LIST: Dict[str, Tuple[int, str]] = {
|
||||
OpCode.OFF: (PayloadMode.OFF, Info.OFF),
|
||||
OpCode.OFF: (ModeId.OFF, Info.OFF),
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
import enum
|
||||
import struct
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
@ -19,7 +20,7 @@ from tmtccmd.tc.pus_200_fsfw_mode import Subservice as ModeSubservices
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
class BpxSetId:
|
||||
class BpxSetId(enum.IntEnum):
|
||||
GET_HK_SET = 0
|
||||
GET_CFG_SET = 5
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
import enum
|
||||
import struct
|
||||
from typing import List
|
||||
|
||||
@ -52,6 +53,7 @@ class GsInfo:
|
||||
class PowerInfo:
|
||||
INFO_CORE = "Core Information"
|
||||
INFO_AUX = "Auxiliary Information"
|
||||
SWITCHER_HK = "Switcher State Information"
|
||||
INFO_ALL = "All Information"
|
||||
REQUEST_SWITCHER_SET = "Request Switcher Information"
|
||||
ENABLE_INFO_HK = "Enable Core Info HK"
|
||||
@ -102,7 +104,7 @@ class PowerOpCodes:
|
||||
|
||||
REBOOT = ["reboot"]
|
||||
INFO_CORE = ["info"]
|
||||
REQUEST_SWITCHER_SET = ["request_switchers"]
|
||||
SWITCHER_HK = ["switcher_states"]
|
||||
ENABLE_INFO_HK = ["info_hk_on"]
|
||||
DISABLE_INFO_HK = ["info_hk_off"]
|
||||
INFO_AUX = ["info_aux"]
|
||||
@ -115,7 +117,7 @@ class PowerOpCodes:
|
||||
PRINT_LATCHUPS = ["print_latchups"]
|
||||
|
||||
|
||||
class SetId:
|
||||
class SetId(enum.IntEnum):
|
||||
CORE = 1
|
||||
AUX = 2
|
||||
CONFIG = 3
|
||||
|
@ -453,18 +453,18 @@ def pack_pl_pcdu_mode_cmd(
|
||||
|
||||
|
||||
ADC_CHANNELS_NAMED = [
|
||||
"U BAT DIV 6",
|
||||
"U NEG V FB",
|
||||
"I HPA",
|
||||
"U HPA DIV 6",
|
||||
"I MPA",
|
||||
"U MPA DIV 6",
|
||||
"I TX",
|
||||
"U TX DIV 6",
|
||||
"I X8",
|
||||
"U X8 DIV 6",
|
||||
"I DRO",
|
||||
"U DRO DIV 6",
|
||||
"U BAT DIV 6 [V]",
|
||||
"U NEG V FB [V]",
|
||||
"I HPA [mA]",
|
||||
"U HPA DIV 6 [V]",
|
||||
"I MPA [mA]",
|
||||
"U MPA DIV 6 [V]",
|
||||
"I TX [mA]",
|
||||
"U TX DIV 6 [V]",
|
||||
"I X8 [mA]",
|
||||
"U X8 DIV 6 [V]",
|
||||
"I DRO [mA]",
|
||||
"U DRO DIV 6 [V]",
|
||||
]
|
||||
|
||||
|
||||
|
@ -13,7 +13,6 @@ from eive_tmtc.config.object_ids import (
|
||||
ACU_HANDLER_ID,
|
||||
PDU_1_HANDLER_ID,
|
||||
PDU_2_HANDLER_ID,
|
||||
PCDU_HANDLER_ID,
|
||||
get_object_ids,
|
||||
)
|
||||
from eive_tmtc.tmtc.power.pdu1 import (
|
||||
@ -29,10 +28,15 @@ from eive_tmtc.tmtc.power.pdu2 import (
|
||||
add_pdu2_cmds,
|
||||
)
|
||||
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
|
||||
from eive_tmtc.config.object_ids import PCDU_HANDLER_ID
|
||||
|
||||
from eive_tmtc.tmtc.power.p60dock import P60OpCode, P60Info, p60_dock_req_hk_cmds
|
||||
from eive_tmtc.tmtc.power.acu import add_acu_cmds, acu_req_hk_cmds
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_diag_command, make_sid
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
create_request_one_diag_command,
|
||||
make_sid,
|
||||
create_request_one_hk_command,
|
||||
)
|
||||
from tmtccmd.config.tmtc import tmtc_definitions_provider
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
|
||||
@ -41,9 +45,20 @@ class SetId(enum.IntEnum):
|
||||
SWITCHER_SET = 0
|
||||
|
||||
|
||||
class PcduSetIds:
|
||||
SWITCHER_SET = 0
|
||||
|
||||
|
||||
def pack_power_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
pdu1_switch_cmds(q, op_code)
|
||||
pdu2_switch_cmds(q, op_code)
|
||||
if op_code in PowerOpCodes.SWITCHER_HK:
|
||||
q.add_log_cmd("Requesting switcher state HK")
|
||||
q.add_pus_tc(
|
||||
create_request_one_diag_command(
|
||||
make_sid(PCDU_HANDLER_ID, PcduSetIds.SWITCHER_SET)
|
||||
)
|
||||
)
|
||||
if op_code in PowerOpCodes.INFO_CORE:
|
||||
pdu1_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
|
||||
pdu2_req_hk_cmds(q, PowerOpCodes.REQUEST_CORE_HK_ONCE[0])
|
||||
@ -73,13 +88,6 @@ def pack_power_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
pack_reset_gnd_wdt_cmd(q, "PDU1", oids[PDU_1_HANDLER_ID])
|
||||
pack_reset_gnd_wdt_cmd(q, "PDU2", oids[PDU_2_HANDLER_ID])
|
||||
q.add_wait_seconds(5.0)
|
||||
elif op_code in PowerOpCodes.REQUEST_SWITCHER_SET:
|
||||
q.add_log_cmd("PCDU: Requesting Switcher Set")
|
||||
q.add_pus_tc(
|
||||
create_request_one_diag_command(
|
||||
make_sid(PCDU_HANDLER_ID, SetId.SWITCHER_SET)
|
||||
)
|
||||
)
|
||||
if q.empty():
|
||||
logging.getLogger(__name__).info(
|
||||
f"Queue is empty, no stack for op code {op_code}"
|
||||
@ -105,10 +113,10 @@ def add_power_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
add_pdu1_common_defs(oce)
|
||||
add_pdu2_common_defs(oce)
|
||||
oce.add(keys=PowerOpCodes.SWITCHER_HK, info=PowerInfo.SWITCHER_HK)
|
||||
oce.add(keys=PowerOpCodes.INFO_ALL, info=PowerInfo.INFO_ALL)
|
||||
oce.add(keys=PowerOpCodes.INFO_CORE, info=PowerInfo.INFO_CORE)
|
||||
oce.add(keys=PowerOpCodes.INFO_AUX, info=PowerInfo.INFO_AUX)
|
||||
oce.add(keys=PowerOpCodes.REQUEST_SWITCHER_SET, info=PowerInfo.REQUEST_SWITCHER_SET)
|
||||
oce.add(keys=PowerOpCodes.RESET_ALL_GND_WDTS, info=PowerInfo.RESET_ALL_GND_WDTS)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.POWER.value,
|
||||
|
@ -7,6 +7,7 @@ from eive_tmtc.tmtc.power.common_power import (
|
||||
unpack_array_in_data,
|
||||
OBC_ENDIANNESS,
|
||||
)
|
||||
from eive_tmtc.tmtc.power.power import PcduSetIds
|
||||
from tmtccmd.util import ObjectIdBase
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
@ -554,3 +555,24 @@ def parse_name_list(data: bytes, name_len: int):
|
||||
idx += len(name)
|
||||
idx += 1
|
||||
return ch_list
|
||||
|
||||
|
||||
def handle_pcdu_hk(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog("Received PCDU HK")
|
||||
if set_id == PcduSetIds.SWITCHER_SET:
|
||||
current_idx = 0
|
||||
pdu1_vals = [hk_data[i] for i in range(len(PDU1_CHANNELS_NAMES))]
|
||||
current_idx += len(PDU1_CHANNELS_NAMES)
|
||||
pdu2_vals = [hk_data[i + current_idx] for i in range(len(PDU2_CHANNELS_NAMES))]
|
||||
current_idx += len(PDU2_CHANNELS_NAMES)
|
||||
p60_stack_val = hk_data[current_idx]
|
||||
current_idx += 1
|
||||
pw.dlog("PDU1 Switcher States")
|
||||
for name, val in zip(PDU1_CHANNELS_NAMES, pdu1_vals):
|
||||
pw.dlog(f"{name.ljust(25)}: {val}")
|
||||
pw.dlog("PDU2 Switcher States")
|
||||
for name, val in zip(PDU2_CHANNELS_NAMES, pdu2_vals):
|
||||
pw.dlog(f"{name.ljust(25)}: {val}")
|
||||
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_val}")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], 3)
|
||||
|
57
eive_tmtc/tmtc/system.py
Normal file
57
eive_tmtc/tmtc/system.py
Normal file
@ -0,0 +1,57 @@
|
||||
from eive_tmtc.config.definitions import CustomServiceList
|
||||
from eive_tmtc.tmtc.acs.subsystem import AcsMode
|
||||
from tmtccmd.config.tmtc import (
|
||||
tmtc_definitions_provider,
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc import service_provider
|
||||
from eive_tmtc.config.object_ids import EIVE_SYSTEM_ID
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import (
|
||||
create_mode_command,
|
||||
Mode,
|
||||
create_announce_mode_recursive_command,
|
||||
)
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
|
||||
|
||||
class OpCode:
|
||||
SAFE_MODE = "safe"
|
||||
IDLE_MODE = "idle"
|
||||
ANNOUNCE_MODES = "announce_modes"
|
||||
|
||||
|
||||
class Info:
|
||||
SAFE_MODE = "Command System into Safe Mode"
|
||||
IDLE_MODE = "Command System into Idle Pointing Mode"
|
||||
ANNOUNCE_MODES = "Announce mode recursively"
|
||||
|
||||
|
||||
@service_provider(CustomServiceList.SYSTEM.value)
|
||||
def build_system_cmds(p: ServiceProviderParams):
|
||||
o = p.op_code
|
||||
q = p.queue_helper
|
||||
prefix = "EIVE System"
|
||||
if o == OpCode.SAFE_MODE:
|
||||
q.add_log_cmd(f"{prefix}: {Info.SAFE_MODE}")
|
||||
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, AcsMode.SAFE, 0))
|
||||
elif o == OpCode.IDLE_MODE:
|
||||
q.add_log_cmd(f"{prefix}: {Info.IDLE_MODE}")
|
||||
q.add_pus_tc(create_mode_command(EIVE_SYSTEM_ID, AcsMode.IDLE, 0))
|
||||
elif o == OpCode.ANNOUNCE_MODES:
|
||||
q.add_log_cmd(f"{prefix}: {Info.ANNOUNCE_MODES}")
|
||||
q.add_pus_tc(create_announce_mode_recursive_command(EIVE_SYSTEM_ID))
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_system_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCode.SAFE_MODE, info=Info.SAFE_MODE)
|
||||
oce.add(keys=OpCode.IDLE_MODE, info=Info.IDLE_MODE)
|
||||
oce.add(keys=OpCode.ANNOUNCE_MODES, info=Info.ANNOUNCE_MODES)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.SYSTEM.value,
|
||||
info="EIVE system commands",
|
||||
op_code_entry=oce,
|
||||
)
|
||||
pass
|
@ -5,3 +5,4 @@ class CtrlSetId(enum.IntEnum):
|
||||
PRIMARY_SENSORS = 0
|
||||
DEVICE_SENSORS = 1
|
||||
SUS_TEMP_SENSORS = 2
|
||||
HEATER_INFO = 4
|
||||
|
@ -32,6 +32,18 @@ class Heater(enum.IntEnum):
|
||||
NUMBER_OF_SWITCHES = 8
|
||||
|
||||
|
||||
HEATER_LOCATION = [
|
||||
"OBC Board",
|
||||
"PLOC Processing Board",
|
||||
"ACS Board",
|
||||
"PCDU PDU",
|
||||
"Camera",
|
||||
"Startracker",
|
||||
"DRO",
|
||||
"HPA",
|
||||
]
|
||||
|
||||
|
||||
class OpCode:
|
||||
HEATER_CMD = ["switch_cmd"]
|
||||
HEATER_EXT_CTRL = ["set_ext_ctrl"]
|
||||
|
@ -9,8 +9,12 @@ from tmtccmd.config.tmtc import (
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode, create_announce_mode_recursive_command
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
make_sid,
|
||||
generate_one_hk_command,
|
||||
create_request_one_diag_command,
|
||||
)
|
||||
|
||||
|
||||
class OpCodeSys:
|
||||
@ -19,6 +23,8 @@ class OpCodeSys:
|
||||
REQUEST_PRIMARY_TEMP_SET = ["temp"]
|
||||
REQUEST_DEVICE_TEMP_SET = ["temp_devs"]
|
||||
REQUEST_DEVICE_SUS_SET = ["temp_sus"]
|
||||
REQUEST_HEATER_INFO = "heater_info"
|
||||
ANNOUNCE_MODES = "announce_modes"
|
||||
|
||||
|
||||
class InfoSys:
|
||||
@ -27,6 +33,8 @@ class InfoSys:
|
||||
REQUEST_PRIMARY_TEMP_SET = "Request HK set of primary sensor temperatures"
|
||||
REQUEST_DEVICE_TEMP_SET = "Request HK set of device sensor temperatures"
|
||||
REQUEST_DEVICE_SUS_SET = "Request HK set of the SUS temperatures"
|
||||
REQUEST_HEATER_INFO = "Request heater information"
|
||||
ANNOUNCE_MODES = "Announce Modes recursively"
|
||||
|
||||
|
||||
def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
@ -46,12 +54,22 @@ def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
|
||||
make_sid(TCS_CONTROLLER, CtrlSetId.SUS_TEMP_SENSORS)
|
||||
)
|
||||
)
|
||||
if op_code == OpCodeSys.REQUEST_HEATER_INFO:
|
||||
q.add_log_cmd(InfoSys.REQUEST_HEATER_INFO)
|
||||
q.add_pus_tc(
|
||||
create_request_one_diag_command(
|
||||
make_sid(TCS_CONTROLLER, CtrlSetId.HEATER_INFO)
|
||||
)
|
||||
)
|
||||
if op_code in OpCodeSys.OFF:
|
||||
q.add_log_cmd(InfoSys.OFF)
|
||||
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.OFF, 0, q, InfoSys.OFF)
|
||||
if op_code in OpCodeSys.NML:
|
||||
q.add_log_cmd(InfoSys.NML)
|
||||
pack_mode_cmd_with_info(TCS_SUBSYSTEM_ID, Mode.NORMAL, 0, q, InfoSys.OFF)
|
||||
if op_code == OpCodeSys.ANNOUNCE_MODES:
|
||||
q.add_log_cmd(InfoSys.ANNOUNCE_MODES)
|
||||
q.add_pus_tc(create_announce_mode_recursive_command(TCS_SUBSYSTEM_ID))
|
||||
pack_tcs_ass_cmds(q, op_code)
|
||||
|
||||
|
||||
@ -67,6 +85,8 @@ def add_tcs_subsystem_cmds(defs: TmtcDefinitionWrapper):
|
||||
keys=OpCodeSys.REQUEST_DEVICE_TEMP_SET, info=InfoSys.REQUEST_DEVICE_TEMP_SET
|
||||
)
|
||||
oce.add(keys=OpCodeSys.REQUEST_DEVICE_SUS_SET, info=InfoSys.REQUEST_DEVICE_SUS_SET)
|
||||
oce.add(keys=OpCodeSys.REQUEST_HEATER_INFO, info=InfoSys.REQUEST_HEATER_INFO)
|
||||
oce.add(keys=OpCodeSys.ANNOUNCE_MODES, info=InfoSys.ANNOUNCE_MODES)
|
||||
defs.add_service(
|
||||
name=CustomServiceList.TCS,
|
||||
info="TCS Board",
|
||||
|
@ -1,10 +1,16 @@
|
||||
import logging
|
||||
import pprint
|
||||
import struct
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.fsfw import validity_buffer_list
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
from .defs import CtrlSetId
|
||||
from .heater import HEATER_LOCATION
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def handle_thermal_controller_hk_data(
|
||||
@ -22,7 +28,10 @@ def handle_thermal_controller_hk_data(
|
||||
pw.dlog("Received sensor temperature data")
|
||||
|
||||
# get all the floats
|
||||
tm_data = struct.unpack("!fffffffffffffffffffff", hk_data[: 21 * 4])
|
||||
fmt_str = "!fffffffffffffffffffff"
|
||||
fmt_len = struct.calcsize(fmt_str)
|
||||
tm_data = struct.unpack(fmt_str, hk_data[:fmt_len])
|
||||
valid_list = validity_buffer_list(hk_data[fmt_len:], 21)
|
||||
parsed_data = {
|
||||
"SENSOR_PLOC_HEATSPREADER": tm_data[0],
|
||||
"SENSOR_PLOC_MISSIONBOARD": tm_data[1],
|
||||
@ -46,14 +55,15 @@ def handle_thermal_controller_hk_data(
|
||||
"TMP1075 PL PCDU 1": tm_data[19],
|
||||
"TMP1075 IF BOARD": tm_data[20],
|
||||
}
|
||||
printer.file_logger.info(str(parsed_data))
|
||||
pp = pprint.PrettyPrinter(depth=4)
|
||||
pp.pprint(parsed_data)
|
||||
for idx, (k, v) in enumerate(parsed_data.items()):
|
||||
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
|
||||
elif set_id == CtrlSetId.DEVICE_SENSORS:
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog("Received device temperature data")
|
||||
fmt_str = "!fhhhhiiiifffhffffffffffffff"
|
||||
fmt_len = struct.calcsize(fmt_str)
|
||||
tm_data = struct.unpack(fmt_str, hk_data[:98])
|
||||
valid_list = validity_buffer_list(hk_data[fmt_len:], 25)
|
||||
parsed_data = {
|
||||
"Q7S_TEMPERATURE": tm_data[0],
|
||||
"BATTERY_TEMPERATURE_1": tm_data[1],
|
||||
@ -68,9 +78,7 @@ def handle_thermal_controller_hk_data(
|
||||
"SYRLINKS_POWER_AMPLIFIER_TEMPERATURE": tm_data[10],
|
||||
"SYRLINKS_BASEBAND_BOARD_TEMPERATURE": tm_data[11],
|
||||
"MGT_TEMPERATURE": tm_data[12],
|
||||
"ACU_TEMPERATURE_1": tm_data[13],
|
||||
"ACU_TEMPERATURE_2": tm_data[14],
|
||||
"ACU_TEMPERATURE_3": tm_data[15],
|
||||
"ACU_TEMPERATURES": (tm_data[13], tm_data[14], tm_data[15]),
|
||||
"PDU1_TEMPERATURE": tm_data[16],
|
||||
"PDU2_TEMPERATURE": tm_data[17],
|
||||
"P60DOCK_TEMPERATURE_1": tm_data[18],
|
||||
@ -79,18 +87,19 @@ def handle_thermal_controller_hk_data(
|
||||
"GYRO_1_TEMPERATURE": tm_data[21],
|
||||
"GYRO_2_TEMPERATURE": tm_data[22],
|
||||
"GYRO_3_TEMPERATURE": tm_data[23],
|
||||
"MGM_0_TEMPERATURE": tm_data[24],
|
||||
"MGM_1_TEMPERATURE": tm_data[25],
|
||||
"MGM_0_LIS3_TEMPERATURE": tm_data[24],
|
||||
"MGM_2_LIS3_TEMPERATURE": tm_data[25],
|
||||
"ADC_PL_PCDU_TEMPERATURE": tm_data[26],
|
||||
}
|
||||
printer.file_logger.info(str(parsed_data))
|
||||
pp = pprint.PrettyPrinter(depth=4)
|
||||
pp.pprint(parsed_data)
|
||||
for idx, (k, v) in enumerate(parsed_data.items()):
|
||||
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
|
||||
elif set_id == CtrlSetId.SUS_TEMP_SENSORS:
|
||||
pw = PrintWrapper(printer)
|
||||
pw.dlog("Received SUS temperature data")
|
||||
fmt_str = "!ffffffffffffffffff"
|
||||
tm_data = struct.unpack(fmt_str, hk_data[: 4 * 18])
|
||||
fmt_str = "!ffffffffffff"
|
||||
fmt_len = struct.calcsize(fmt_str)
|
||||
tm_data = struct.unpack(fmt_str, hk_data[: 12 * 4])
|
||||
valid_list = validity_buffer_list(hk_data[fmt_len:], 12)
|
||||
parsed_data = {
|
||||
"SUS_0": tm_data[0],
|
||||
"SUS_1": tm_data[1],
|
||||
@ -105,6 +114,15 @@ def handle_thermal_controller_hk_data(
|
||||
"SUS_10": tm_data[10],
|
||||
"SUS_11": tm_data[11],
|
||||
}
|
||||
printer.file_logger.info(str(parsed_data))
|
||||
pp = pprint.PrettyPrinter(depth=4)
|
||||
pp.pprint(parsed_data)
|
||||
for idx, (k, v) in enumerate(parsed_data.items()):
|
||||
print(f"{str(k).ljust(30)}: Valid: {valid_list[idx]}, Value: {v}")
|
||||
elif set_id == CtrlSetId.HEATER_INFO:
|
||||
print("Heater Switch States")
|
||||
for i in range(8):
|
||||
print(
|
||||
f"{HEATER_LOCATION[i].ljust(25)}: {'On' if hk_data[i] == 1 else 'Off'}"
|
||||
)
|
||||
current_draw = struct.unpack("!H", hk_data[8:10])[0]
|
||||
print(f"Heater Power Channel Current Draw: {current_draw} mA")
|
||||
else:
|
||||
_LOGGER.warning(f"Unimplemented set ID {set_id}")
|
||||
|
@ -7,27 +7,31 @@
|
||||
"""
|
||||
import enum
|
||||
|
||||
from eive_tmtc.config.definitions import CustomServiceList
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from tmtccmd.config.tmtc import (
|
||||
tmtc_definitions_provider,
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_data
|
||||
from tmtccmd.pus.s8_fsfw_funccmd import make_action_id
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import create_request_one_hk_command, make_sid
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
|
||||
|
||||
class Tmp1075TestProcedure:
|
||||
"""
|
||||
@brief Use this class to define the tests to perform for the Tmp1075.
|
||||
@details Setting all to True will run all tests.
|
||||
Setting all to False will only run the tests set to True.
|
||||
"""
|
||||
class OpCode:
|
||||
OFF = "off"
|
||||
ON = "on"
|
||||
NML = "nml"
|
||||
HK = "hk"
|
||||
|
||||
all = False
|
||||
start_adc_conversion = False
|
||||
get_temp = False
|
||||
set_mode_normal = (
|
||||
True # Setting mode to normal starts continuous temperature reading
|
||||
)
|
||||
set_mode_on = False # If mode is MODE_ON, temperature will only be read on command
|
||||
|
||||
class Info:
|
||||
OFF = "Off"
|
||||
ON = "On"
|
||||
NML = "Normal"
|
||||
HK = "HK"
|
||||
|
||||
|
||||
class Tmp1075ActionId(enum.IntEnum):
|
||||
@ -35,30 +39,40 @@ class Tmp1075ActionId(enum.IntEnum):
|
||||
START_ADC_CONV = 2
|
||||
|
||||
|
||||
class SetId:
|
||||
TMEPERATURE = 1
|
||||
|
||||
|
||||
def pack_tmp1075_test_into(
|
||||
object_id: ObjectIdU32, op_code: str, q: DefaultPusQueueHelper
|
||||
):
|
||||
q.add_log_cmd(
|
||||
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}"
|
||||
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id}"
|
||||
)
|
||||
obyt = object_id.as_bytes
|
||||
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.start_adc_conversion:
|
||||
q.add_log_cmd("TMP1075: Starting new temperature conversion")
|
||||
command = obyt + make_action_id(Tmp1075ActionId.GET_TEMP)
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
if Tmp1075TestProcedure.all or Tmp1075TestProcedure.get_temp:
|
||||
q.add_log_cmd("TMP1075: Read temperature")
|
||||
command = obyt + make_action_id(Tmp1075ActionId.START_ADC_CONV)
|
||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
|
||||
|
||||
if Tmp1075TestProcedure.set_mode_normal:
|
||||
if op_code == OpCode.OFF:
|
||||
q.add_log_cmd("TMP1075: Set Normal Off")
|
||||
mode_data = pack_mode_data(obyt, Mode.OFF, 0)
|
||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
|
||||
if op_code == OpCode.NML:
|
||||
q.add_log_cmd("TMP1075: Set Mode Normal")
|
||||
mode_data = pack_mode_data(obyt, Mode.NORMAL, 0)
|
||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
|
||||
|
||||
if Tmp1075TestProcedure.set_mode_on:
|
||||
if op_code == OpCode.ON:
|
||||
q.add_log_cmd("TMP1075: Set Mode On")
|
||||
mode_data = pack_mode_data(obyt, Mode.ON, 0)
|
||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
|
||||
|
||||
if op_code == OpCode.HK:
|
||||
q.add_log_cmd("TMP1075: Request One-Shot HK")
|
||||
q.add_pus_tc(create_request_one_hk_command(make_sid(obyt, SetId.TMEPERATURE)))
|
||||
return q
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_tmp_sens_cmds(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(OpCode.OFF, Info.OFF)
|
||||
oce.add(OpCode.ON, Info.ON)
|
||||
oce.add(OpCode.NML, Info.NML)
|
||||
oce.add(OpCode.HK, Info.HK)
|
||||
defs.add_service(CustomServiceList.TMP1075.value, "TMP1075 Temperature Sensor", oce)
|
||||
|
164
eive_tmtc/tmtc/tm_store.py
Normal file
164
eive_tmtc/tmtc/tm_store.py
Normal file
@ -0,0 +1,164 @@
|
||||
import datetime
|
||||
import enum
|
||||
import logging
|
||||
import math
|
||||
import struct
|
||||
|
||||
from eive_tmtc.config.object_ids import (
|
||||
HK_TM_STORE,
|
||||
MISC_TM_STORE,
|
||||
OK_TM_STORE,
|
||||
NOT_OK_TM_STORE,
|
||||
CFDP_TM_STORE,
|
||||
get_object_ids,
|
||||
)
|
||||
from eive_tmtc.config.definitions import CustomServiceList
|
||||
from tmtccmd.config import TmtcDefinitionWrapper
|
||||
from tmtccmd.config.tmtc import tmtc_definitions_provider, OpCodeEntry
|
||||
from tmtccmd.tc import service_provider
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
from dateutil.parser import parse
|
||||
|
||||
from spacepackets.ecss import PusService # noqa
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from spacepackets.ecss.pus_15_tm_storage import Subservice
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
|
||||
|
||||
class OpCode:
|
||||
RETRIEVAL_BY_TIME_RANGE = "retrieval_time_range"
|
||||
DELETE_UP_TO = "delete_up_to"
|
||||
|
||||
|
||||
class Info:
|
||||
RETRIEVAL_BY_TIME_RANGE = "Dump Telemetry Packets by time range"
|
||||
DELETE_UP_TO = "Delete Telemetry Packets up to time"
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@service_provider(CustomServiceList.TM_STORE)
|
||||
def pack_tm_store_commands(p: ServiceProviderParams):
|
||||
q = p.queue_helper
|
||||
o = p.op_code
|
||||
if o == OpCode.DELETE_UP_TO:
|
||||
obj_id, store_string = store_select_prompt()
|
||||
app_data = bytearray(obj_id.as_bytes)
|
||||
delete_up_to_time = time_prompt()
|
||||
end_stamp = int(math.floor(delete_up_to_time.timestamp()))
|
||||
app_data.extend(struct.pack("!I", end_stamp))
|
||||
q.add_log_cmd(Info.DELETE_UP_TO)
|
||||
q.add_log_cmd(f"Selected Store: {obj_id}")
|
||||
q.add_log_cmd(
|
||||
f"Deletion up to time "
|
||||
f"{datetime.datetime.fromtimestamp(end_stamp, tz=datetime.timezone.utc)}"
|
||||
)
|
||||
q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=15, subservice=Subservice.DELETE_UP_TO, app_data=app_data
|
||||
)
|
||||
)
|
||||
elif o == OpCode.RETRIEVAL_BY_TIME_RANGE:
|
||||
q.add_log_cmd(Info.RETRIEVAL_BY_TIME_RANGE)
|
||||
pass
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_persistent_tm_store_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCode.DELETE_UP_TO, info=Info.DELETE_UP_TO)
|
||||
oce.add(keys=OpCode.RETRIEVAL_BY_TIME_RANGE, info=Info.RETRIEVAL_BY_TIME_RANGE)
|
||||
defs.add_service(
|
||||
CustomServiceList.TM_STORE, "Persistent TM Store", op_code_entry=oce
|
||||
)
|
||||
|
||||
|
||||
class TmStoreSelect(enum.IntEnum):
|
||||
OK_TM_STORE = 0
|
||||
NOT_OK_TM_STORE = 1
|
||||
MISC_TM_STORE = 2
|
||||
HK_TM_STORE = 3
|
||||
CFDP_TM_STORE = 4
|
||||
|
||||
|
||||
STORE_DICT = {
|
||||
TmStoreSelect.OK_TM_STORE: (OK_TM_STORE, "OK Store (Verification)"),
|
||||
TmStoreSelect.NOT_OK_TM_STORE: (
|
||||
NOT_OK_TM_STORE,
|
||||
"NOT OK Store (Events, Verification Failures..)",
|
||||
),
|
||||
TmStoreSelect.MISC_TM_STORE: (MISC_TM_STORE, "Miscellaneous Store"),
|
||||
TmStoreSelect.HK_TM_STORE: (HK_TM_STORE, "HK TM Store"),
|
||||
TmStoreSelect.CFDP_TM_STORE: (CFDP_TM_STORE, "CFDP TM Store"),
|
||||
}
|
||||
|
||||
|
||||
TIME_INPUT_DICT = {
|
||||
1: "Full manual input with dateutil.parser.parse",
|
||||
2: "Offset from now in seconds",
|
||||
}
|
||||
|
||||
|
||||
def time_prompt() -> datetime.datetime:
|
||||
print("Available time input types: ")
|
||||
for k, v in TIME_INPUT_DICT.items():
|
||||
print(f" {k}: {v}")
|
||||
while True:
|
||||
time_input_key = int(input("Please specify a time input type by key: "))
|
||||
if time_input_key not in TIME_INPUT_DICT.keys():
|
||||
_LOGGER.warning("Invalid key, try again")
|
||||
continue
|
||||
break
|
||||
if time_input_key == 1:
|
||||
return time_prompt_fully_manually()
|
||||
elif time_input_key == 2:
|
||||
return time_prompt_offset_from_now()
|
||||
|
||||
|
||||
def time_prompt_fully_manually() -> datetime.datetime:
|
||||
# TODO: Add support for offset from now in seconds
|
||||
time = parse(
|
||||
input(
|
||||
"Please enter the time in any format supported by dateutil.parser.parse\n"
|
||||
"Recommended format: UTC ISO format YYYY-MM-DDThh:mm:ssZ: "
|
||||
)
|
||||
)
|
||||
print(f"Parsed timestamp: {time}")
|
||||
return time
|
||||
|
||||
|
||||
def time_prompt_offset_from_now() -> datetime.datetime:
|
||||
seconds_offset = math.floor(
|
||||
float(
|
||||
input(
|
||||
"Please enter the time as a offset from now in seconds. Negative offset is allowed: "
|
||||
)
|
||||
)
|
||||
)
|
||||
time_now_with_offset = datetime.datetime.now(
|
||||
tz=datetime.timezone.utc
|
||||
) + datetime.timedelta(seconds=seconds_offset)
|
||||
print(f"Absolute resulting time: {time_now_with_offset}")
|
||||
return time_now_with_offset
|
||||
|
||||
|
||||
def store_select_prompt() -> (ObjectIdU32, str):
|
||||
obj_id_dict = get_object_ids()
|
||||
print("Available TM stores:")
|
||||
idx_to_obj_id = dict()
|
||||
for k, v in STORE_DICT.items():
|
||||
print(f" {k}: {v[1]}")
|
||||
while True:
|
||||
target_index = int(
|
||||
input("Please enter the target store for the TM store transaction: ")
|
||||
)
|
||||
desc_and_obj_id = STORE_DICT.get(TmStoreSelect(target_index))
|
||||
if desc_and_obj_id is None:
|
||||
_LOGGER.warning("Invalid index. Try again")
|
||||
continue
|
||||
break
|
||||
obj_id_raw = desc_and_obj_id[0]
|
||||
obj_id = obj_id_dict.get(obj_id_raw)
|
||||
print(f"Selected store: {obj_id} ({desc_and_obj_id[1]})")
|
||||
return obj_id_dict.get(obj_id_raw), desc_and_obj_id[1]
|
Reference in New Issue
Block a user