Merge pull request 'acs-ctrl-update' (#140) from acs-ctrl-update into main

Reviewed-on: #140
Reviewed-by: Robin Müller <muellerr@irs.uni-stuttgart.de>
This commit is contained in:
Robin Müller 2023-02-28 11:49:13 +01:00
commit 481e57be59
2 changed files with 281 additions and 11 deletions

View File

@ -10,9 +10,16 @@ list yields a list of all related PRs for each release.
# [unreleased]
## Added
- Added ACS action cmds
- Added new ACS hk values
- Added ACS set parameter cmds
## Fixed
- Correction for ACS CTRL raw data requests HK type
- Fixed diag related ACS hk cmds
## Added

View File

@ -14,6 +14,7 @@ from tmtccmd.config.tmtc import (
OpCodeEntry,
)
from tmtccmd.tc import service_provider
from tmtccmd.tc.queue import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command
from tmtccmd.tc.decorator import ServiceProviderParams
from tmtccmd.tc.pus_3_fsfw_hk import (
@ -23,8 +24,25 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
disable_periodic_hk_command,
create_request_one_diag_command,
)
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.tc.pus_20_fsfw_param import (
create_load_param_cmd
)
from tmtccmd.pus.s20_fsfw_param_defs import (
create_scalar_u8_parameter,
create_scalar_u16_parameter,
create_scalar_i32_parameter,
create_scalar_float_parameter,
create_scalar_double_parameter,
create_vector_float_parameter,
create_vector_double_parameter,
create_matrix_float_parameter,
create_matrix_double_parameter,
)
class SetId(enum.IntEnum):
MGM_RAW_SET = 0
@ -49,12 +67,24 @@ class Submode(enum.IntEnum):
PTG_TARGET_GS = 15
PTG_INERTIAL = 16
class ActionId(enum.IntEnum):
SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL = 0
RESET_MEKF = 1
class OpCodes:
OFF = ["off"]
SAFE = ["normal_safe"]
DTBL = ["normal_detumble"]
IDLE = ["normal_idle"]
NADIR = ["normal_nadir"]
TARGET = ["normal_target"]
GS = ["normal_gs"]
INERTIAL = ["normal_inertial"]
SAFE_PTG = ["confirm_deployment"]
RESET_MEKF = ["reset_mekf"]
SET_PARAMETER_SCALAR = ["set_scalar_param"]
SET_PARAMETER_VECTOR = ["set_vector_param"]
SET_PARAMETER_MATRIX = ["set_matrix_param"]
REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"]
ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"]
DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"]
@ -89,9 +119,18 @@ class OpCodes:
class Info:
OFF = "Switch ACS CTRL off"
SAFE = "Switch ACS CTRL normal safe"
DTBL = "Switch ACS CTRL normal detumble"
IDLE = "Switch ACS CTRL normal idle"
SAFE = "Switch ACS CTRL normal - safe"
DTBL = "Switch ACS CTRL normal - detumble"
IDLE = "Switch ACS CTRL normal - idle"
NADIR = "Switch ACS CTRL normal - pointing nadir"
TARGET = "Switch ACS CTRL normal - pointing target"
GS = "Switch ACS CTRL normal - pointing target groundstation"
INERTIAL = "Switch ACS CTRL normal - pointing inertial"
SAFE_PTG = "Confirm deployment of both solar arrays"
RESET_MEKF = "Reset the MEKF"
SET_PARAMETER_SCALAR = "Set Scalar Parameter"
SET_PARAMETER_VECTOR = "Set Vector Parameter"
SET_PARAMETER_MATRIX = "Set Matrix Parameter"
REQUEST_RAW_MGM_HK = "Request Raw MGM HK once"
ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation"
DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation"
@ -143,6 +182,15 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCodes.SAFE, info=Info.SAFE)
oce.add(keys=OpCodes.DTBL, info=Info.DTBL)
oce.add(keys=OpCodes.IDLE, info=Info.IDLE)
oce.add(keys=OpCodes.NADIR, info=Info.NADIR)
oce.add(keys=OpCodes.TARGET, info=Info.TARGET)
oce.add(keys=OpCodes.GS, info=Info.GS)
oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL)
oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG)
oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF)
oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR)
oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR)
oce.add(keys=OpCodes.SET_PARAMETER_MATRIX, info=Info.SET_PARAMETER_MATRIX)
oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK)
oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK)
oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK)
@ -194,6 +242,33 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
elif op_code in OpCodes.IDLE:
q.add_log_cmd(f"{Info.IDLE}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.IDLE))
elif op_code in OpCodes.NADIR:
q.add_log_cmd(f"{Info.NADIR}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_NADIR))
elif op_code in OpCodes.TARGET:
q.add_log_cmd(f"{Info.TARGET}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET))
elif op_code in OpCodes.GS:
q.add_log_cmd(f"{Info.GS}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET_GS))
elif op_code in OpCodes.INERTIAL:
q.add_log_cmd(f"{Info.INERTIAL}")
q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_INERTIAL))
elif op_code in OpCodes.SAFE_PTG:
q.add_log_cmd(f"{Info.SAFE_PTG}")
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL))
elif op_code in OpCodes.RESET_MEKF:
q.add_log_cmd(f"{Info.RESET_MEKF}")
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF))
elif op_code in OpCodes.SET_PARAMETER_SCALAR:
q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}")
set_acs_ctrl_param_scalar(q)
elif op_code in OpCodes.SET_PARAMETER_VECTOR:
q.add_log_cmd(f"{Info.SET_PARAMETER_VECTOR}")
set_acs_ctrl_param_vector(q)
elif op_code in OpCodes.SET_PARAMETER_MATRIX:
q.add_log_cmd(f"{Info.SET_PARAMETER_MATRIX}")
set_acs_ctrl_param_matrix(q)
elif op_code in OpCodes.REQUEST_RAW_MGM_HK:
q.add_log_cmd(Info.REQUEST_RAW_MGM_HK)
q.add_pus_tc(
@ -299,7 +374,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
elif op_code in OpCodes.ENABLE_PROC_GYR_HK:
q.add_log_cmd(Info.ENABLE_PROC_GYR_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(
False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0
True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0
)
q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1])
@ -307,7 +382,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
q.add_log_cmd(Info.DISABLE_PROC_GYR_HK)
q.add_pus_tc(
disable_periodic_hk_command(
False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET)
)
)
elif op_code in OpCodes.REQUEST_PROC_GPS_HK:
@ -331,7 +406,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
)
elif op_code in OpCodes.REQUEST_MEKF_HK:
q.add_log_cmd(Info.REQUEST_MEKF_HK)
q.add_pus_tc(generate_one_hk_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA)))
q.add_pus_tc(create_request_one_diag_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA)))
elif op_code in OpCodes.ENABLE_MEKF_HK:
q.add_log_cmd(Info.ENABLE_MEKF_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(
@ -386,6 +461,177 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
logging.getLogger(__name__).info(f"Unknown op code {op_code}")
def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
pt = int(input("Specify parameter type to set {0: \"uint8\", 1: \"uint16\", 2: \"int32\", 3: \"float\", "
"4: \"double\"}: "))
sid = int(input("Specify parameter struct ID to set: "))
pid = int(input("Specify parameter ID to set: "))
match pt:
case 0:
param = int(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u8_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 1:
param = int(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_u16_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 2:
param = int(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_i32_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 3:
param = float(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_float_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
case 4:
param = float(input("Specify parameter value to set: "))
q.add_pus_tc(
create_load_param_cmd(
create_scalar_double_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameter=param,
).pack()
)
)
def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper):
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
sid = int(input("Specify parameter struct ID to set: "))
pid = int(input("Specify parameter ID to set: "))
match pt:
case 0:
elms = int(input("Specify number of elements in vector to set: "))
param = []
for _ in range(elms):
param.append(float(input("Specify parameter vector entry value to set: ")))
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_vector_float_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
case 1:
elms = int(input("Specify number of elements in vector to set: "))
param = []
for _ in range(elms):
param.append(float(input("Specify parameter vector entry value to set: ")))
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_vector_double_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper):
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
sid = int(input("Specify parameter struct ID to set: "))
pid = int(input("Specify parameter ID to set: "))
match pt:
case 0:
rows = int(input("Specify number of rows in matrix to set: "))
cols = int(input("Specify number of columns in matrix to set: "))
row = []
param = []
for _ in range(rows):
for _ in range(cols):
row.append(float(input("Specify parameter vector entry value to set: ")))
param.append(row)
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_matrix_float_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
case 1:
rows = int(input("Specify number of rows in matrix to set: "))
cols = int(input("Specify number of columns in matrix to set: "))
row = []
param = []
for _ in range(rows):
for _ in range(cols):
row.append(float(input("Specify parameter vector entry value to set: ")))
param.append(row)
row = []
print(param)
if input("Confirm selected parameter values (Y/N): ") == "Y":
q.add_pus_tc(
create_load_param_cmd(
create_matrix_double_parameter(
object_id=ACS_CONTROLLER,
domain_id=sid,
unique_id=pid,
parameters=param,
).pack()
)
)
else:
q.add_log_cmd("Aborted by user input")
return
def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
pw = PrintWrapper(printer)
match set_id:
@ -684,14 +930,18 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
mekf_status = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA",
4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"}
pw.dlog("Received MEKF Set")
fmt_quat = "!dddd"
fmt_str_4 = "[{:8.3f}, {:8.3f}, {:8.3f}, {:8.3f}]"
fmt_str_3 = "[{:8.3f}, {:8.3f}, {:8.3f}]"
fmt_vec = "!ddd"
fmt_sts = "!B"
inc_len_quat = struct.calcsize(fmt_quat)
inc_len_vec = struct.calcsize(fmt_vec)
if len(hk_data) < inc_len_quat + inc_len_vec:
inc_len_sts = struct.calcsize(fmt_sts)
if len(hk_data) < inc_len_quat + inc_len_vec + inc_len_sts:
pw.dlog("Received HK set too small")
return
current_idx = 0
@ -699,18 +949,23 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
current_idx += inc_len_quat
rate = struct.unpack(fmt_vec, hk_data[current_idx : current_idx + inc_len_vec])
current_idx += inc_len_vec
status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0]
current_idx += inc_len_sts
pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekf_status[status]}")
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=2)
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog("Received CTRL Values Set")
fmt_quat = "!dddd"
fmt_scalar = "!d"
fmt_vec = "!ddd"
inc_len_quat = struct.calcsize(fmt_quat)
inc_len_scalar = struct.calcsize(fmt_scalar)
if len(hk_data) < 2 * inc_len_quat + inc_len_scalar:
inc_len_vec = struct.calcsize(fmt_vec)
if len(hk_data) < 2 * inc_len_quat + inc_len_scalar + inc_len_vec:
pw.dlog("Received HK set too small")
return
current_idx = 0
@ -735,10 +990,18 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
)
]
current_idx += inc_len_scalar
tgt_rot = [
f"{val:8.3f}"
for val in struct.unpack(
fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]
)
]
current_idx += inc_len_vec
pw.dlog(f"Control Values Target Quaternion: {tgt_quat}")
pw.dlog(f"Control Values Error Quaternion: {err_quat}")
pw.dlog(f"Control Values Error Angle: {err_ang} [rad]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [rad/s]")
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=4)
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):