Compare commits
24 Commits
9720fcddec
...
481e57be59
Author | SHA1 | Date | |
---|---|---|---|
481e57be59 | |||
039eed1e16 | |||
001270a1e5 | |||
b443867512 | |||
9867ee873a | |||
aa427151f5 | |||
0ce32521f4 | |||
649c019eb8 | |||
fa5c24df6b | |||
a13454851b | |||
78b55f919c | |||
3259cf0807 | |||
8919f5c8e9 | |||
736bc23fc5 | |||
02797ffc5b | |||
6a30f6d0ac | |||
11454f6015 | |||
3f71475860 | |||
7120f14601 | |||
7a82a77876 | |||
23aeb8bcf9 | |||
bc9f13f296 | |||
09dbc6b14d | |||
31d51e3660 |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -10,9 +10,20 @@ list yields a list of all related PRs for each release.
|
||||
|
||||
# [unreleased]
|
||||
|
||||
## Added
|
||||
|
||||
- Added ACS action cmds
|
||||
- Added new ACS hk values
|
||||
- Added ACS set parameter cmds
|
||||
|
||||
## Fixed
|
||||
|
||||
- Correction for ACS CTRL raw data requests HK type
|
||||
- Fixed diag related ACS hk cmds
|
||||
|
||||
## Added
|
||||
|
||||
- Basic MGM commanding (modes)
|
||||
|
||||
# [v2.16.1] 2023-02-24
|
||||
|
||||
@@ -312,4 +323,4 @@ tmtccmd v4.0.0rc0
|
||||
- Extended heater commands for more informative output which component is heated
|
||||
|
||||
See [milestones](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/milestones)
|
||||
and [releases](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/releases)
|
||||
and [releases](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/releases)
|
||||
|
@@ -37,6 +37,7 @@ class CustomServiceList(str, enum.Enum):
|
||||
ACU = "acu"
|
||||
ACS = "acs"
|
||||
GYRO = "gyro"
|
||||
MGMS = "mgms"
|
||||
COM_SS = "com"
|
||||
BPX_BATTERY = "bpx"
|
||||
HEATER = "heater"
|
||||
|
@@ -4,6 +4,7 @@ import logging
|
||||
from typing import cast
|
||||
|
||||
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
|
||||
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
|
||||
from eive_tmtc.tmtc.power.power import pack_power_commands
|
||||
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
|
||||
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
|
||||
@@ -135,6 +136,8 @@ def handle_default_procedure(
|
||||
return pack_single_rw_test_into(
|
||||
object_id=RW4_ID, rw_idx=4, q=queue_helper, op_code=op_code
|
||||
)
|
||||
if service == CustomServiceList.MGMS.value:
|
||||
return handle_mgm_cmd(q=queue_helper, op_code=op_code)
|
||||
if service == CustomServiceList.RAD_SENSOR.value:
|
||||
object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID))
|
||||
return pack_rad_sensor_test_into(
|
||||
|
@@ -14,6 +14,7 @@ from tmtccmd.config.tmtc import (
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.tc import service_provider
|
||||
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command
|
||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
@@ -23,8 +24,25 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||
disable_periodic_hk_command,
|
||||
create_request_one_diag_command,
|
||||
)
|
||||
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
from tmtccmd.tc.pus_20_fsfw_param import (
|
||||
create_load_param_cmd
|
||||
)
|
||||
|
||||
from tmtccmd.pus.s20_fsfw_param_defs import (
|
||||
create_scalar_u8_parameter,
|
||||
create_scalar_u16_parameter,
|
||||
create_scalar_i32_parameter,
|
||||
create_scalar_float_parameter,
|
||||
create_scalar_double_parameter,
|
||||
create_vector_float_parameter,
|
||||
create_vector_double_parameter,
|
||||
create_matrix_float_parameter,
|
||||
create_matrix_double_parameter,
|
||||
)
|
||||
|
||||
|
||||
class SetId(enum.IntEnum):
|
||||
MGM_RAW_SET = 0
|
||||
@@ -49,12 +67,24 @@ class Submode(enum.IntEnum):
|
||||
PTG_TARGET_GS = 15
|
||||
PTG_INERTIAL = 16
|
||||
|
||||
class ActionId(enum.IntEnum):
|
||||
SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL = 0
|
||||
RESET_MEKF = 1
|
||||
|
||||
class OpCodes:
|
||||
OFF = ["off"]
|
||||
SAFE = ["normal_safe"]
|
||||
DTBL = ["normal_detumble"]
|
||||
IDLE = ["normal_idle"]
|
||||
NADIR = ["normal_nadir"]
|
||||
TARGET = ["normal_target"]
|
||||
GS = ["normal_gs"]
|
||||
INERTIAL = ["normal_inertial"]
|
||||
SAFE_PTG = ["confirm_deployment"]
|
||||
RESET_MEKF = ["reset_mekf"]
|
||||
SET_PARAMETER_SCALAR = ["set_scalar_param"]
|
||||
SET_PARAMETER_VECTOR = ["set_vector_param"]
|
||||
SET_PARAMETER_MATRIX = ["set_matrix_param"]
|
||||
REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"]
|
||||
ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"]
|
||||
DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"]
|
||||
@@ -89,9 +119,18 @@ class OpCodes:
|
||||
|
||||
class Info:
|
||||
OFF = "Switch ACS CTRL off"
|
||||
SAFE = "Switch ACS CTRL normal safe"
|
||||
DTBL = "Switch ACS CTRL normal detumble"
|
||||
IDLE = "Switch ACS CTRL normal idle"
|
||||
SAFE = "Switch ACS CTRL normal - safe"
|
||||
DTBL = "Switch ACS CTRL normal - detumble"
|
||||
IDLE = "Switch ACS CTRL normal - idle"
|
||||
NADIR = "Switch ACS CTRL normal - pointing nadir"
|
||||
TARGET = "Switch ACS CTRL normal - pointing target"
|
||||
GS = "Switch ACS CTRL normal - pointing target groundstation"
|
||||
INERTIAL = "Switch ACS CTRL normal - pointing inertial"
|
||||
SAFE_PTG = "Confirm deployment of both solar arrays"
|
||||
RESET_MEKF = "Reset the MEKF"
|
||||
SET_PARAMETER_SCALAR = "Set Scalar Parameter"
|
||||
SET_PARAMETER_VECTOR = "Set Vector Parameter"
|
||||
SET_PARAMETER_MATRIX = "Set Matrix Parameter"
|
||||
REQUEST_RAW_MGM_HK = "Request Raw MGM HK once"
|
||||
ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation"
|
||||
DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation"
|
||||
@@ -143,6 +182,15 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce.add(keys=OpCodes.SAFE, info=Info.SAFE)
|
||||
oce.add(keys=OpCodes.DTBL, info=Info.DTBL)
|
||||
oce.add(keys=OpCodes.IDLE, info=Info.IDLE)
|
||||
oce.add(keys=OpCodes.NADIR, info=Info.NADIR)
|
||||
oce.add(keys=OpCodes.TARGET, info=Info.TARGET)
|
||||
oce.add(keys=OpCodes.GS, info=Info.GS)
|
||||
oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL)
|
||||
oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG)
|
||||
oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF)
|
||||
oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR)
|
||||
oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR)
|
||||
oce.add(keys=OpCodes.SET_PARAMETER_MATRIX, info=Info.SET_PARAMETER_MATRIX)
|
||||
oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK)
|
||||
oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK)
|
||||
oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK)
|
||||
@@ -194,6 +242,33 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
||||
elif op_code in OpCodes.IDLE:
|
||||
q.add_log_cmd(f"{Info.IDLE}")
|
||||
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.IDLE))
|
||||
elif op_code in OpCodes.NADIR:
|
||||
q.add_log_cmd(f"{Info.NADIR}")
|
||||
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_NADIR))
|
||||
elif op_code in OpCodes.TARGET:
|
||||
q.add_log_cmd(f"{Info.TARGET}")
|
||||
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET))
|
||||
elif op_code in OpCodes.GS:
|
||||
q.add_log_cmd(f"{Info.GS}")
|
||||
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET_GS))
|
||||
elif op_code in OpCodes.INERTIAL:
|
||||
q.add_log_cmd(f"{Info.INERTIAL}")
|
||||
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_INERTIAL))
|
||||
elif op_code in OpCodes.SAFE_PTG:
|
||||
q.add_log_cmd(f"{Info.SAFE_PTG}")
|
||||
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL))
|
||||
elif op_code in OpCodes.RESET_MEKF:
|
||||
q.add_log_cmd(f"{Info.RESET_MEKF}")
|
||||
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF))
|
||||
elif op_code in OpCodes.SET_PARAMETER_SCALAR:
|
||||
q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}")
|
||||
set_acs_ctrl_param_scalar(q)
|
||||
elif op_code in OpCodes.SET_PARAMETER_VECTOR:
|
||||
q.add_log_cmd(f"{Info.SET_PARAMETER_VECTOR}")
|
||||
set_acs_ctrl_param_vector(q)
|
||||
elif op_code in OpCodes.SET_PARAMETER_MATRIX:
|
||||
q.add_log_cmd(f"{Info.SET_PARAMETER_MATRIX}")
|
||||
set_acs_ctrl_param_matrix(q)
|
||||
elif op_code in OpCodes.REQUEST_RAW_MGM_HK:
|
||||
q.add_log_cmd(Info.REQUEST_RAW_MGM_HK)
|
||||
q.add_pus_tc(
|
||||
@@ -299,7 +374,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
||||
elif op_code in OpCodes.ENABLE_PROC_GYR_HK:
|
||||
q.add_log_cmd(Info.ENABLE_PROC_GYR_HK)
|
||||
cmd_tuple = enable_periodic_hk_command_with_interval(
|
||||
False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0
|
||||
True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0
|
||||
)
|
||||
q.add_pus_tc(cmd_tuple[0])
|
||||
q.add_pus_tc(cmd_tuple[1])
|
||||
@@ -307,7 +382,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
||||
q.add_log_cmd(Info.DISABLE_PROC_GYR_HK)
|
||||
q.add_pus_tc(
|
||||
disable_periodic_hk_command(
|
||||
False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
|
||||
True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
|
||||
)
|
||||
)
|
||||
elif op_code in OpCodes.REQUEST_PROC_GPS_HK:
|
||||
@@ -331,7 +406,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
||||
)
|
||||
elif op_code in OpCodes.REQUEST_MEKF_HK:
|
||||
q.add_log_cmd(Info.REQUEST_MEKF_HK)
|
||||
q.add_pus_tc(generate_one_hk_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA)))
|
||||
q.add_pus_tc(create_request_one_diag_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA)))
|
||||
elif op_code in OpCodes.ENABLE_MEKF_HK:
|
||||
q.add_log_cmd(Info.ENABLE_MEKF_HK)
|
||||
cmd_tuple = enable_periodic_hk_command_with_interval(
|
||||
@@ -386,6 +461,177 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
||||
logging.getLogger(__name__).info(f"Unknown op code {op_code}")
|
||||
|
||||
|
||||
def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
|
||||
pt = int(input("Specify parameter type to set {0: \"uint8\", 1: \"uint16\", 2: \"int32\", 3: \"float\", "
|
||||
"4: \"double\"}: "))
|
||||
sid = int(input("Specify parameter struct ID to set: "))
|
||||
pid = int(input("Specify parameter ID to set: "))
|
||||
match pt:
|
||||
case 0:
|
||||
param = int(input("Specify parameter value to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_u8_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameter=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
case 1:
|
||||
param = int(input("Specify parameter value to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_u16_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameter=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
case 2:
|
||||
param = int(input("Specify parameter value to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_i32_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameter=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
case 3:
|
||||
param = float(input("Specify parameter value to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_float_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameter=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
case 4:
|
||||
param = float(input("Specify parameter value to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_double_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameter=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper):
|
||||
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
|
||||
sid = int(input("Specify parameter struct ID to set: "))
|
||||
pid = int(input("Specify parameter ID to set: "))
|
||||
match pt:
|
||||
case 0:
|
||||
elms = int(input("Specify number of elements in vector to set: "))
|
||||
param = []
|
||||
for _ in range(elms):
|
||||
param.append(float(input("Specify parameter vector entry value to set: ")))
|
||||
print(param)
|
||||
if input("Confirm selected parameter values (Y/N): ") == "Y":
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_vector_float_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameters=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
else:
|
||||
q.add_log_cmd("Aborted by user input")
|
||||
return
|
||||
case 1:
|
||||
elms = int(input("Specify number of elements in vector to set: "))
|
||||
param = []
|
||||
for _ in range(elms):
|
||||
param.append(float(input("Specify parameter vector entry value to set: ")))
|
||||
print(param)
|
||||
if input("Confirm selected parameter values (Y/N): ") == "Y":
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_vector_double_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameters=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
else:
|
||||
q.add_log_cmd("Aborted by user input")
|
||||
return
|
||||
|
||||
|
||||
def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
|
||||
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
|
||||
sid = int(input("Specify parameter struct ID to set: "))
|
||||
pid = int(input("Specify parameter ID to set: "))
|
||||
match pt:
|
||||
case 0:
|
||||
rows = int(input("Specify number of rows in matrix to set: "))
|
||||
cols = int(input("Specify number of columns in matrix to set: "))
|
||||
row = []
|
||||
param = []
|
||||
for _ in range(rows):
|
||||
for _ in range(cols):
|
||||
row.append(float(input("Specify parameter vector entry value to set: ")))
|
||||
param.append(row)
|
||||
print(param)
|
||||
if input("Confirm selected parameter values (Y/N): ") == "Y":
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_matrix_float_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameters=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
else:
|
||||
q.add_log_cmd("Aborted by user input")
|
||||
return
|
||||
case 1:
|
||||
rows = int(input("Specify number of rows in matrix to set: "))
|
||||
cols = int(input("Specify number of columns in matrix to set: "))
|
||||
row = []
|
||||
param = []
|
||||
for _ in range(rows):
|
||||
for _ in range(cols):
|
||||
row.append(float(input("Specify parameter vector entry value to set: ")))
|
||||
param.append(row)
|
||||
row = []
|
||||
print(param)
|
||||
if input("Confirm selected parameter values (Y/N): ") == "Y":
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_matrix_double_parameter(
|
||||
object_id=ACS_CONTROLLER,
|
||||
domain_id=sid,
|
||||
unique_id=pid,
|
||||
parameters=param,
|
||||
).pack()
|
||||
)
|
||||
)
|
||||
else:
|
||||
q.add_log_cmd("Aborted by user input")
|
||||
return
|
||||
|
||||
|
||||
def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||
pw = PrintWrapper(printer)
|
||||
match set_id:
|
||||
@@ -684,14 +930,18 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
|
||||
|
||||
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
||||
mekf_status = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA",
|
||||
4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"}
|
||||
pw.dlog("Received MEKF Set")
|
||||
fmt_quat = "!dddd"
|
||||
fmt_str_4 = "[{:8.3f}, {:8.3f}, {:8.3f}, {:8.3f}]"
|
||||
fmt_str_3 = "[{:8.3f}, {:8.3f}, {:8.3f}]"
|
||||
fmt_vec = "!ddd"
|
||||
fmt_sts = "!B"
|
||||
inc_len_quat = struct.calcsize(fmt_quat)
|
||||
inc_len_vec = struct.calcsize(fmt_vec)
|
||||
if len(hk_data) < inc_len_quat + inc_len_vec:
|
||||
inc_len_sts = struct.calcsize(fmt_sts)
|
||||
if len(hk_data) < inc_len_quat + inc_len_vec + inc_len_sts:
|
||||
pw.dlog("Received HK set too small")
|
||||
return
|
||||
current_idx = 0
|
||||
@@ -699,18 +949,23 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
||||
current_idx += inc_len_quat
|
||||
rate = struct.unpack(fmt_vec, hk_data[current_idx : current_idx + inc_len_vec])
|
||||
current_idx += inc_len_vec
|
||||
status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0]
|
||||
current_idx += inc_len_sts
|
||||
pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekf_status[status]}")
|
||||
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
|
||||
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=2)
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||
|
||||
|
||||
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog("Received CTRL Values Set")
|
||||
fmt_quat = "!dddd"
|
||||
fmt_scalar = "!d"
|
||||
fmt_vec = "!ddd"
|
||||
inc_len_quat = struct.calcsize(fmt_quat)
|
||||
inc_len_scalar = struct.calcsize(fmt_scalar)
|
||||
if len(hk_data) < 2 * inc_len_quat + inc_len_scalar:
|
||||
inc_len_vec = struct.calcsize(fmt_vec)
|
||||
if len(hk_data) < 2 * inc_len_quat + inc_len_scalar + inc_len_vec:
|
||||
pw.dlog("Received HK set too small")
|
||||
return
|
||||
current_idx = 0
|
||||
@@ -735,10 +990,18 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_scalar
|
||||
tgt_rot = [
|
||||
f"{val:8.3f}"
|
||||
for val in struct.unpack(
|
||||
fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]
|
||||
)
|
||||
]
|
||||
current_idx += inc_len_vec
|
||||
pw.dlog(f"Control Values Target Quaternion: {tgt_quat}")
|
||||
pw.dlog(f"Control Values Error Quaternion: {err_quat}")
|
||||
pw.dlog(f"Control Values Error Angle: {err_ang} [rad]")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [rad/s]")
|
||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=4)
|
||||
|
||||
|
||||
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
|
||||
|
@@ -2,7 +2,10 @@ import enum
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from spacepackets.ecss import PusTelecommand
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_201_fsfw_health import pack_set_health_cmd_data, FsfwHealth
|
||||
from tmtccmd.pus.s201_fsfw_health import Subservice
|
||||
|
||||
import eive_tmtc.config.object_ids as obj_ids
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
|
||||
@@ -30,6 +33,7 @@ class OpCode:
|
||||
OFF = "off"
|
||||
CORE_HK = "core_hk"
|
||||
CFG_HK = "cfg_hk"
|
||||
SET_FAULTY = "set_faulty"
|
||||
|
||||
|
||||
class AdisGyroSetId(enum.IntEnum):
|
||||
@@ -85,6 +89,15 @@ def handle_gyr_cmd(q: DefaultPusQueueHelper, op_code: str):
|
||||
q.add_pus_tc(
|
||||
create_request_one_hk_command(make_sid(gyr_obj_id, AdisGyroSetId.CFG_HK))
|
||||
)
|
||||
elif op_code == OpCode.SET_FAULTY:
|
||||
q.add_log_cmd(f"Gyro {gyr_info[0]} set faulty")
|
||||
q.add_pus_tc(
|
||||
PusTelecommand(
|
||||
service=201,
|
||||
subservice=Subservice.TC_SET_HEALTH,
|
||||
app_data=pack_set_health_cmd_data(object_id=gyr_obj_id, health=FsfwHealth.FAULTY)
|
||||
)
|
||||
)
|
||||
else:
|
||||
logging.getLogger(__name__).warning(
|
||||
f"invalid op code {op_code} for gyro command"
|
||||
@@ -177,4 +190,5 @@ def add_gyr_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce.add(keys=OpCode.CFG_HK, info="Request CFG HK")
|
||||
oce.add(keys=OpCode.NML, info="Normal Mode")
|
||||
oce.add(keys=OpCode.OFF, info="Off Mode")
|
||||
oce.add(keys=OpCode.SET_FAULTY, info="Set Faulty")
|
||||
defs.add_service(CustomServiceList.GYRO, info="Gyro", op_code_entry=oce)
|
||||
|
@@ -61,6 +61,7 @@ class ImtqSetId:
|
||||
NEGATIVE_Y_TEST = 13
|
||||
POSITIVE_Z_TEST = 14
|
||||
NEGATIVE_Z_TEST = 15
|
||||
SELF_TEST_SET = 16
|
||||
|
||||
|
||||
class ImtqActionId:
|
||||
@@ -307,7 +308,7 @@ ENG_HK_HEADERS = [
|
||||
|
||||
|
||||
def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
|
||||
if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST):
|
||||
if set_id == ImtqSetId.SELF_TEST_SET:
|
||||
return handle_self_test_data(printer, hk_data)
|
||||
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
|
||||
_LOGGER.info("Found engineering HK without torque")
|
||||
|
@@ -1,13 +1,25 @@
|
||||
import enum
|
||||
import struct
|
||||
|
||||
from eive_tmtc.config.definitions import CustomServiceList
|
||||
from tmtccmd.config import OpCodeEntry
|
||||
|
||||
import eive_tmtc.config.object_ids as obj_ids
|
||||
from eive_tmtc.config.object_ids import MGM_0_LIS3_HANDLER_ID, MGM_1_RM3100_HANDLER_ID, MGM_2_LIS3_HANDLER_ID, MGM_3_RM3100_HANDLER_ID
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.config.tmtc import tmtc_definitions_provider, TmtcDefinitionWrapper
|
||||
from tmtccmd.tc import DefaultPusQueueHelper
|
||||
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
|
||||
class OpCode:
|
||||
NORMAL = "normal"
|
||||
OFF = "off"
|
||||
|
||||
|
||||
class MgmLis3SetId(enum.IntEnum):
|
||||
CORE_HK = 0
|
||||
|
||||
@@ -16,6 +28,36 @@ class MgmRm3100SetId(enum.IntEnum):
|
||||
CORE_HK = 0
|
||||
|
||||
|
||||
class MgmSel(enum.IntEnum):
|
||||
MGM_0_LIS3 = 0
|
||||
MGM_1_RM3100 = 1
|
||||
MGM_2_LIS3 = 2
|
||||
MGM_3_RM3100 = 3
|
||||
|
||||
|
||||
MGM_SEL_DICT = {
|
||||
MgmSel.MGM_0_LIS3: ("MGM_0_LIS3", MGM_0_LIS3_HANDLER_ID),
|
||||
MgmSel.MGM_1_RM3100: ("MGM_1_RM3100", MGM_1_RM3100_HANDLER_ID),
|
||||
MgmSel.MGM_2_LIS3: ("MGM_2_LIS3", MGM_2_LIS3_HANDLER_ID),
|
||||
MgmSel.MGM_3_RM3100: ("MGM_3_RM3100", MGM_3_RM3100_HANDLER_ID),
|
||||
}
|
||||
|
||||
|
||||
def handle_mgm_cmd(q: DefaultPusQueueHelper, op_code: str):
|
||||
print("Please select the MGM Device")
|
||||
for (k, v) in MGM_SEL_DICT.items():
|
||||
print(f"{k}: {v[0]}")
|
||||
sel_idx = int(input("Select MGM device by index: "))
|
||||
mgm_info = MGM_SEL_DICT[MgmSel(sel_idx)]
|
||||
mgm_obj_id = mgm_info[1]
|
||||
if op_code == OpCode.NORMAL:
|
||||
q.add_log_cmd(f"Gyro {mgm_info[0]} NORMAL mode")
|
||||
q.add_pus_tc(create_mode_command(mgm_obj_id, Mode.NORMAL, 0))
|
||||
if op_code == OpCode.OFF:
|
||||
q.add_log_cmd(f"Gyro {mgm_info[0]} OFF mode")
|
||||
q.add_pus_tc(create_mode_command(mgm_obj_id, Mode.OFF, 0))
|
||||
|
||||
|
||||
def handle_mgm_hk_data(
|
||||
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
|
||||
):
|
||||
@@ -61,3 +103,11 @@ def handle_mgm_rm3100_hk_data(
|
||||
pw.dlog(
|
||||
f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}"
|
||||
)
|
||||
|
||||
|
||||
@tmtc_definitions_provider
|
||||
def add_mgm_cmd_defs(defs: TmtcDefinitionWrapper):
|
||||
oce = OpCodeEntry()
|
||||
oce.add(keys=OpCode.NORMAL, info="Normal Mode")
|
||||
oce.add(keys=OpCode.OFF, info="Off Mode")
|
||||
defs.add_service(CustomServiceList.MGMS, info="MGMs", op_code_entry=oce)
|
||||
|
@@ -45,7 +45,7 @@ class SetId(enum.IntEnum):
|
||||
SWITCHER_SET = 0
|
||||
|
||||
|
||||
class PcduSetIds:
|
||||
class PcduSetIds(enum.IntEnum):
|
||||
SWITCHER_SET = 0
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user