Compare commits

...

24 Commits

Author SHA1 Message Date
481e57be59 Merge pull request 'acs-ctrl-update' (#140) from acs-ctrl-update into main
Reviewed-on: #140
Reviewed-by: Robin Müller <muellerr@irs.uni-stuttgart.de>
2023-02-28 11:49:13 +01:00
039eed1e16 Merge branch 'main' into acs-ctrl-update 2023-02-28 11:04:44 +01:00
001270a1e5 changelog 2023-02-28 11:03:16 +01:00
b443867512 final fixes 2023-02-28 11:02:59 +01:00
9867ee873a Merge pull request 'Feature: MGM base cmds' (#143) from feature_mgm_base_cmds into main
Reviewed-on: #143
2023-02-28 10:11:00 +01:00
aa427151f5 changelog 2023-02-28 01:27:10 +01:00
0ce32521f4 mgm base cmds 2023-02-28 01:24:47 +01:00
649c019eb8 Merge pull request 'tmtc faulty gyor cmd' (#142) from kranz_tmtc_faulty_gyro_cmd into main
Reviewed-on: #142
Reviewed-by: Robin Müller <muellerr@irs.uni-stuttgart.de>
2023-02-27 18:20:58 +01:00
fa5c24df6b Merge branch 'main' into kranz_tmtc_faulty_gyro_cmd 2023-02-27 18:20:49 +01:00
a13454851b tmtc faulty gyor cmd 2023-02-27 18:22:57 +01:00
78b55f919c Merge pull request 'int enum' (#141) from kranz_enum into main
Reviewed-on: #141
2023-02-27 16:58:18 +01:00
3259cf0807 self test intro 2023-02-27 15:24:47 +01:00
8919f5c8e9 self test set for easier operator handling 2023-02-27 15:16:20 +01:00
736bc23fc5 added command to set matrix parameter 2023-02-27 15:04:02 +01:00
02797ffc5b int enum 2023-02-27 14:57:57 +01:00
6a30f6d0ac added commands to change scalar and vector parameters 2023-02-27 14:01:54 +01:00
11454f6015 Merge branch 'main' into acs-ctrl-update 2023-02-27 09:23:33 +01:00
3f71475860 uptd 2023-02-24 15:28:58 +01:00
7120f14601 Merge branch 'main' into acs-ctrl-update 2023-02-24 15:27:55 +01:00
7a82a77876 changelog 2023-02-24 15:24:57 +01:00
23aeb8bcf9 added action commands 2023-02-24 15:06:54 +01:00
bc9f13f296 diag datasets and mekf update 2023-02-24 14:53:09 +01:00
09dbc6b14d updated handle_mekf_data 2023-02-24 14:01:39 +01:00
31d51e3660 added commands for missing submodes 2023-02-24 13:56:50 +01:00
8 changed files with 356 additions and 13 deletions

View File

@@ -10,9 +10,20 @@ list yields a list of all related PRs for each release.
# [unreleased]
## Added
- Added ACS action cmds
- Added new ACS hk values
- Added ACS set parameter cmds
## Fixed
- Correction for ACS CTRL raw data requests HK type
- Fixed diag related ACS hk cmds
## Added
- Basic MGM commanding (modes)
# [v2.16.1] 2023-02-24
@@ -312,4 +323,4 @@ tmtccmd v4.0.0rc0
- Extended heater commands for more informative output which component is heated
See [milestones](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/milestones)
and [releases](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/releases)
and [releases](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/releases)

View File

@@ -37,6 +37,7 @@ class CustomServiceList(str, enum.Enum):
ACU = "acu"
ACS = "acs"
GYRO = "gyro"
MGMS = "mgms"
COM_SS = "com"
BPX_BATTERY = "bpx"
HEATER = "heater"

View File

@@ -4,6 +4,7 @@ import logging
from typing import cast
from eive_tmtc.tmtc.acs.gyros import handle_gyr_cmd
from eive_tmtc.tmtc.acs.mgms import handle_mgm_cmd
from eive_tmtc.tmtc.power.power import pack_power_commands
from eive_tmtc.tmtc.tcs.rtd import pack_rtd_commands
from eive_tmtc.tmtc.payload.scex import pack_scex_cmds
@@ -135,6 +136,8 @@ def handle_default_procedure(
return pack_single_rw_test_into(
object_id=RW4_ID, rw_idx=4, q=queue_helper, op_code=op_code
)
if service == CustomServiceList.MGMS.value:
return handle_mgm_cmd(q=queue_helper, op_code=op_code)
if service == CustomServiceList.RAD_SENSOR.value:
object_id = cast(ObjectIdU32, obj_id_man.get(RAD_SENSOR_ID))
return pack_rad_sensor_test_into(

View File

@@ -14,6 +14,7 @@ from tmtccmd.config.tmtc import (
OpCodeEntry,
)
from tmtccmd.tc import service_provider
from tmtccmd.tc.queue import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command
from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.tc.pus_3_fsfw_hk import (
@@ -23,8 +24,25 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
disable_periodic_hk_command,
create_request_one_diag_command,
)
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.tc.pus_20_fsfw_param import (
create_load_param_cmd
)
from tmtccmd.pus.s20_fsfw_param_defs import (
create_scalar_u8_parameter,
create_scalar_u16_parameter,
create_scalar_i32_parameter,
create_scalar_float_parameter,
create_scalar_double_parameter,
create_vector_float_parameter,
create_vector_double_parameter,
create_matrix_float_parameter,
create_matrix_double_parameter,
)
class SetId(enum.IntEnum):
MGM_RAW_SET = 0
@@ -49,12 +67,24 @@ class Submode(enum.IntEnum):
PTG_TARGET_GS = 15
PTG_INERTIAL = 16
class ActionId(enum.IntEnum):
SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL = 0
RESET_MEKF = 1
class OpCodes:
OFF = ["off"]
SAFE = ["normal_safe"]
DTBL = ["normal_detumble"]
IDLE = ["normal_idle"]
NADIR = ["normal_nadir"]
TARGET = ["normal_target"]
GS = ["normal_gs"]
INERTIAL = ["normal_inertial"]
SAFE_PTG = ["confirm_deployment"]
RESET_MEKF = ["reset_mekf"]
SET_PARAMETER_SCALAR = ["set_scalar_param"]
SET_PARAMETER_VECTOR = ["set_vector_param"]
SET_PARAMETER_MATRIX = ["set_matrix_param"]
REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"]
ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"]
DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"]
@@ -89,9 +119,18 @@ class OpCodes:
class Info:
OFF = "Switch ACS CTRL off"
SAFE = "Switch ACS CTRL normal safe"
DTBL = "Switch ACS CTRL normal detumble"
IDLE = "Switch ACS CTRL normal idle"
SAFE = "Switch ACS CTRL normal - safe"
DTBL = "Switch ACS CTRL normal - detumble"
IDLE = "Switch ACS CTRL normal - idle"
NADIR = "Switch ACS CTRL normal - pointing nadir"
TARGET = "Switch ACS CTRL normal - pointing target"
GS = "Switch ACS CTRL normal - pointing target groundstation"
INERTIAL = "Switch ACS CTRL normal - pointing inertial"
SAFE_PTG = "Confirm deployment of both solar arrays"
RESET_MEKF = "Reset the MEKF"
SET_PARAMETER_SCALAR = "Set Scalar Parameter"
SET_PARAMETER_VECTOR = "Set Vector Parameter"
SET_PARAMETER_MATRIX = "Set Matrix Parameter"
REQUEST_RAW_MGM_HK = "Request Raw MGM HK once"
ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation"
DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation"
@@ -143,6 +182,15 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCodes.SAFE, info=Info.SAFE)
oce.add(keys=OpCodes.DTBL, info=Info.DTBL)
oce.add(keys=OpCodes.IDLE, info=Info.IDLE)
oce.add(keys=OpCodes.NADIR, info=Info.NADIR)
oce.add(keys=OpCodes.TARGET, info=Info.TARGET)
oce.add(keys=OpCodes.GS, info=Info.GS)
oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL)
oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG)
oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF)
oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR)
oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR)
oce.add(keys=OpCodes.SET_PARAMETER_MATRIX, info=Info.SET_PARAMETER_MATRIX)
oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK)
oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK)
oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK)
@@ -194,6 +242,33 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
elif op_code in OpCodes.IDLE:
q.add_log_cmd(f"{Info.IDLE}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.IDLE))
elif op_code in OpCodes.NADIR:
q.add_log_cmd(f"{Info.NADIR}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_NADIR))
elif op_code in OpCodes.TARGET:
q.add_log_cmd(f"{Info.TARGET}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET))
elif op_code in OpCodes.GS:
q.add_log_cmd(f"{Info.GS}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET_GS))
elif op_code in OpCodes.INERTIAL:
q.add_log_cmd(f"{Info.INERTIAL}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_INERTIAL))
elif op_code in OpCodes.SAFE_PTG:
q.add_log_cmd(f"{Info.SAFE_PTG}")
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL))
elif op_code in OpCodes.RESET_MEKF:
q.add_log_cmd(f"{Info.RESET_MEKF}")
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF))
elif op_code in OpCodes.SET_PARAMETER_SCALAR:
q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}")
set_acs_ctrl_param_scalar(q)
elif op_code in OpCodes.SET_PARAMETER_VECTOR:
q.add_log_cmd(f"{Info.SET_PARAMETER_VECTOR}")
set_acs_ctrl_param_vector(q)
elif op_code in OpCodes.SET_PARAMETER_MATRIX:
q.add_log_cmd(f"{Info.SET_PARAMETER_MATRIX}")
set_acs_ctrl_param_matrix(q)
elif op_code in OpCodes.REQUEST_RAW_MGM_HK:
q.add_log_cmd(Info.REQUEST_RAW_MGM_HK)
q.add_pus_tc(
@@ -299,7 +374,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
elif op_code in OpCodes.ENABLE_PROC_GYR_HK:
q.add_log_cmd(Info.ENABLE_PROC_GYR_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(
False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0
True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0
)
q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1])
@@ -307,7 +382,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
q.add_log_cmd(Info.DISABLE_PROC_GYR_HK)
q.add_pus_tc(
disable_periodic_hk_command(
False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
)
)
elif op_code in OpCodes.REQUEST_PROC_GPS_HK:
@@ -331,7 +406,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
)
elif op_code in OpCodes.REQUEST_MEKF_HK:
q.add_log_cmd(Info.REQUEST_MEKF_HK)
q.add_pus_tc(generate_one_hk_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA)))
q.add_pus_tc(create_request_one_diag_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA)))
elif op_code in OpCodes.ENABLE_MEKF_HK:
q.add_log_cmd(Info.ENABLE_MEKF_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(
@@ -386,6 +461,177 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
logging.getLogger(__name__).info(f"Unknown op code {op_code}")
def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
pt = int(input("Specify parameter type to set {0: \"uint8\", 1: \"uint16\", 2: \"int32\", 3: \"float\", "
"4: \"double\"}: "))
sid = int(input("Specify parameter struct ID to set: "))
pid = int(input("Specify parameter ID to set: "))
match pt:
case 0:
param = int(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u8_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 1:
param = int(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u16_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 2:
param = int(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_i32_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 3:
param = float(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 4:
param = float(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_double_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper):
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
sid = int(input("Specify parameter struct ID to set: "))
pid = int(input("Specify parameter ID to set: "))
match pt:
case 0:
elms = int(input("Specify number of elements in vector to set: "))
param = []
for _ in range(elms):
param.append(float(input("Specify parameter vector entry value to set: ")))
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_vector_float_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
case 1:
elms = int(input("Specify number of elements in vector to set: "))
param = []
for _ in range(elms):
param.append(float(input("Specify parameter vector entry value to set: ")))
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_vector_double_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
sid = int(input("Specify parameter struct ID to set: "))
pid = int(input("Specify parameter ID to set: "))
match pt:
case 0:
rows = int(input("Specify number of rows in matrix to set: "))
cols = int(input("Specify number of columns in matrix to set: "))
row = []
param = []
for _ in range(rows):
for _ in range(cols):
row.append(float(input("Specify parameter vector entry value to set: ")))
param.append(row)
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_matrix_float_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
case 1:
rows = int(input("Specify number of rows in matrix to set: "))
cols = int(input("Specify number of columns in matrix to set: "))
row = []
param = []
for _ in range(rows):
for _ in range(cols):
row.append(float(input("Specify parameter vector entry value to set: ")))
param.append(row)
row = []
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_matrix_double_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer)
match set_id:
@@ -684,14 +930,18 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
mekf_status = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA",
4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"}
pw.dlog("Received MEKF Set")
fmt_quat = "!dddd"
fmt_str_4 = "[{:8.3f}, {:8.3f}, {:8.3f}, {:8.3f}]"
fmt_str_3 = "[{:8.3f}, {:8.3f}, {:8.3f}]"
fmt_vec = "!ddd"
fmt_sts = "!B"
inc_len_quat = struct.calcsize(fmt_quat)
inc_len_vec = struct.calcsize(fmt_vec)
if len(hk_data) < inc_len_quat + inc_len_vec:
inc_len_sts = struct.calcsize(fmt_sts)
if len(hk_data) < inc_len_quat + inc_len_vec + inc_len_sts:
pw.dlog("Received HK set too small")
return
current_idx = 0
@@ -699,18 +949,23 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
current_idx += inc_len_quat
rate = struct.unpack(fmt_vec, hk_data[current_idx : current_idx + inc_len_vec])
current_idx += inc_len_vec
status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0]
current_idx += inc_len_sts
pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekf_status[status]}")
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=2)
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog("Received CTRL Values Set")
fmt_quat = "!dddd"
fmt_scalar = "!d"
fmt_vec = "!ddd"
inc_len_quat = struct.calcsize(fmt_quat)
inc_len_scalar = struct.calcsize(fmt_scalar)
if len(hk_data) < 2 * inc_len_quat + inc_len_scalar:
inc_len_vec = struct.calcsize(fmt_vec)
if len(hk_data) < 2 * inc_len_quat + inc_len_scalar + inc_len_vec:
pw.dlog("Received HK set too small")
return
current_idx = 0
@@ -735,10 +990,18 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
)
]
current_idx += inc_len_scalar
tgt_rot = [
f"{val:8.3f}"
for val in struct.unpack(
fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]
)
]
current_idx += inc_len_vec
pw.dlog(f"Control Values Target Quaternion: {tgt_quat}")
pw.dlog(f"Control Values Error Quaternion: {err_quat}")
pw.dlog(f"Control Values Error Angle: {err_ang} [rad]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [rad/s]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=4)
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):

View File

@@ -2,7 +2,10 @@ import enum
import logging
import struct
from spacepackets.ecss import PusTelecommand
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_201_fsfw_health import pack_set_health_cmd_data, FsfwHealth
from tmtccmd.pus.s201_fsfw_health import Subservice
import eive_tmtc.config.object_ids as obj_ids
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
@@ -30,6 +33,7 @@ class OpCode:
OFF = "off"
CORE_HK = "core_hk"
CFG_HK = "cfg_hk"
SET_FAULTY = "set_faulty"
class AdisGyroSetId(enum.IntEnum):
@@ -85,6 +89,15 @@ def handle_gyr_cmd(q: DefaultPusQueueHelper, op_code: str):
q.add_pus_tc(
create_request_one_hk_command(make_sid(gyr_obj_id, AdisGyroSetId.CFG_HK))
)
elif op_code == OpCode.SET_FAULTY:
q.add_log_cmd(f"Gyro {gyr_info[0]} set faulty")
q.add_pus_tc(
PusTelecommand(
service=201,
subservice=Subservice.TC_SET_HEALTH,
app_data=pack_set_health_cmd_data(object_id=gyr_obj_id, health=FsfwHealth.FAULTY)
)
)
else:
logging.getLogger(__name__).warning(
f"invalid op code {op_code} for gyro command"
@@ -177,4 +190,5 @@ def add_gyr_cmd_defs(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCode.CFG_HK, info="Request CFG HK")
oce.add(keys=OpCode.NML, info="Normal Mode")
oce.add(keys=OpCode.OFF, info="Off Mode")
oce.add(keys=OpCode.SET_FAULTY, info="Set Faulty")
defs.add_service(CustomServiceList.GYRO, info="Gyro", op_code_entry=oce)

View File

@@ -61,6 +61,7 @@ class ImtqSetId:
NEGATIVE_Y_TEST = 13
POSITIVE_Z_TEST = 14
NEGATIVE_Z_TEST = 15
SELF_TEST_SET = 16
class ImtqActionId:
@@ -307,7 +308,7 @@ ENG_HK_HEADERS = [
def handle_imtq_hk(printer: FsfwTmTcPrinter, hk_data: bytes, set_id: int):
if (set_id >= ImtqSetId.POSITIVE_X_TEST) and (set_id <= ImtqSetId.NEGATIVE_Z_TEST):
if set_id == ImtqSetId.SELF_TEST_SET:
return handle_self_test_data(printer, hk_data)
elif set_id == ImtqSetId.ENG_HK_NO_TORQUE:
_LOGGER.info("Found engineering HK without torque")

View File

@@ -1,13 +1,25 @@
import enum
import struct
from eive_tmtc.config.definitions import CustomServiceList
from tmtccmd.config import OpCodeEntry
import eive_tmtc.config.object_ids as obj_ids
from eive_tmtc.config.object_ids import MGM_0_LIS3_HANDLER_ID, MGM_1_RM3100_HANDLER_ID, MGM_2_LIS3_HANDLER_ID, MGM_3_RM3100_HANDLER_ID
from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.config.tmtc import tmtc_definitions_provider, TmtcDefinitionWrapper
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import create_mode_command, Mode
from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
class OpCode:
NORMAL = "normal"
OFF = "off"
class MgmLis3SetId(enum.IntEnum):
CORE_HK = 0
@@ -16,6 +28,36 @@ class MgmRm3100SetId(enum.IntEnum):
CORE_HK = 0
class MgmSel(enum.IntEnum):
MGM_0_LIS3 = 0
MGM_1_RM3100 = 1
MGM_2_LIS3 = 2
MGM_3_RM3100 = 3
MGM_SEL_DICT = {
MgmSel.MGM_0_LIS3: ("MGM_0_LIS3", MGM_0_LIS3_HANDLER_ID),
MgmSel.MGM_1_RM3100: ("MGM_1_RM3100", MGM_1_RM3100_HANDLER_ID),
MgmSel.MGM_2_LIS3: ("MGM_2_LIS3", MGM_2_LIS3_HANDLER_ID),
MgmSel.MGM_3_RM3100: ("MGM_3_RM3100", MGM_3_RM3100_HANDLER_ID),
}
def handle_mgm_cmd(q: DefaultPusQueueHelper, op_code: str):
print("Please select the MGM Device")
for (k, v) in MGM_SEL_DICT.items():
print(f"{k}: {v[0]}")
sel_idx = int(input("Select MGM device by index: "))
mgm_info = MGM_SEL_DICT[MgmSel(sel_idx)]
mgm_obj_id = mgm_info[1]
if op_code == OpCode.NORMAL:
q.add_log_cmd(f"Gyro {mgm_info[0]} NORMAL mode")
q.add_pus_tc(create_mode_command(mgm_obj_id, Mode.NORMAL, 0))
if op_code == OpCode.OFF:
q.add_log_cmd(f"Gyro {mgm_info[0]} OFF mode")
q.add_pus_tc(create_mode_command(mgm_obj_id, Mode.OFF, 0))
def handle_mgm_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
@@ -61,3 +103,11 @@ def handle_mgm_rm3100_hk_data(
pw.dlog(
f"Field strengths in micro Tesla X {field_x} | Y {field_y} | Z {field_z}"
)
@tmtc_definitions_provider
def add_mgm_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCode.NORMAL, info="Normal Mode")
oce.add(keys=OpCode.OFF, info="Off Mode")
defs.add_service(CustomServiceList.MGMS, info="MGMs", op_code_entry=oce)

View File

@@ -45,7 +45,7 @@ class SetId(enum.IntEnum):
SWITCHER_SET = 0
class PcduSetIds:
class PcduSetIds(enum.IntEnum):
SWITCHER_SET = 0