acs-ctrl-update #140
@ -14,6 +14,7 @@ from tmtccmd.config.tmtc import (
|
|||||||
OpCodeEntry,
|
OpCodeEntry,
|
||||||
)
|
)
|
||||||
from tmtccmd.tc import service_provider
|
from tmtccmd.tc import service_provider
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command
|
from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command
|
||||||
from tmtccmd.tc.decorator import ServiceProviderParams
|
from tmtccmd.tc.decorator import ServiceProviderParams
|
||||||
from tmtccmd.tc.pus_3_fsfw_hk import (
|
from tmtccmd.tc.pus_3_fsfw_hk import (
|
||||||
@ -26,6 +27,20 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
|
|||||||
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
|
from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd
|
||||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||||
|
|
||||||
|
from tmtccmd.tc.pus_20_fsfw_param import (
|
||||||
|
create_load_param_cmd
|
||||||
|
)
|
||||||
|
|
||||||
|
from tmtccmd.pus.s20_fsfw_param_defs import (
|
||||||
|
create_scalar_u8_parameter,
|
||||||
|
create_scalar_u16_parameter,
|
||||||
|
create_scalar_u32_parameter,
|
||||||
|
create_scalar_float_parameter,
|
||||||
|
create_scalar_double_parameter,
|
||||||
|
create_vector_float_parameter,
|
||||||
|
create_vector_double_parameter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SetId(enum.IntEnum):
|
class SetId(enum.IntEnum):
|
||||||
MGM_RAW_SET = 0
|
MGM_RAW_SET = 0
|
||||||
@ -65,6 +80,8 @@ class OpCodes:
|
|||||||
INERTIAL = ["normal_inertial"]
|
INERTIAL = ["normal_inertial"]
|
||||||
SAFE_PTG = ["confirm_deployment"]
|
SAFE_PTG = ["confirm_deployment"]
|
||||||
RESET_MEKF = ["reset_mekf"]
|
RESET_MEKF = ["reset_mekf"]
|
||||||
|
SET_PARAMETER_SCALAR = ["set_scalar_param"]
|
||||||
|
SET_PARAMETER_VECTOR = ["set_vector_param"]
|
||||||
REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"]
|
REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"]
|
||||||
ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"]
|
ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"]
|
||||||
DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"]
|
DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"]
|
||||||
@ -108,6 +125,9 @@ class Info:
|
|||||||
INERTIAL = "Switch ACS CTRL normal - pointing inertial"
|
INERTIAL = "Switch ACS CTRL normal - pointing inertial"
|
||||||
SAFE_PTG = "Confirm deployment of both solar arrays"
|
SAFE_PTG = "Confirm deployment of both solar arrays"
|
||||||
RESET_MEKF = "Reset the MEKF"
|
RESET_MEKF = "Reset the MEKF"
|
||||||
|
SET_PARAMETER_SCALAR = "Set Scalar Parameter"
|
||||||
|
SET_PARAMETER_VECTOR = "Set Vector Parameter"
|
||||||
|
SET_PARAMETER_MATRIX = "Set Matrix Parameter"
|
||||||
REQUEST_RAW_MGM_HK = "Request Raw MGM HK once"
|
REQUEST_RAW_MGM_HK = "Request Raw MGM HK once"
|
||||||
ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation"
|
ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation"
|
||||||
DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation"
|
DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation"
|
||||||
@ -165,6 +185,8 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
|
|||||||
oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL)
|
oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL)
|
||||||
oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG)
|
oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG)
|
||||||
oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF)
|
oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF)
|
||||||
|
oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR)
|
||||||
|
oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR)
|
||||||
oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK)
|
oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK)
|
||||||
oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK)
|
oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK)
|
||||||
oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK)
|
oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK)
|
||||||
@ -234,6 +256,12 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
|||||||
elif op_code in OpCodes.RESET_MEKF:
|
elif op_code in OpCodes.RESET_MEKF:
|
||||||
q.add_log_cmd(f"{Info.RESET_MEKF}")
|
q.add_log_cmd(f"{Info.RESET_MEKF}")
|
||||||
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF))
|
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF))
|
||||||
|
elif op_code in OpCodes.SET_PARAMETER_SCALAR:
|
||||||
|
q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}")
|
||||||
|
set_acs_ctrl_param_scalar(q)
|
||||||
|
elif op_code in OpCodes.SET_PARAMETER_VECTOR:
|
||||||
|
q.add_log_cmd(f"{Info.SET_PARAMETER_VECTOR}")
|
||||||
|
set_acs_ctrl_param_vector(q)
|
||||||
elif op_code in OpCodes.REQUEST_RAW_MGM_HK:
|
elif op_code in OpCodes.REQUEST_RAW_MGM_HK:
|
||||||
q.add_log_cmd(Info.REQUEST_RAW_MGM_HK)
|
q.add_log_cmd(Info.REQUEST_RAW_MGM_HK)
|
||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
@ -426,6 +454,121 @@ def pack_acs_ctrl_command(p: ServiceProviderParams):
|
|||||||
logging.getLogger(__name__).info(f"Unknown op code {op_code}")
|
logging.getLogger(__name__).info(f"Unknown op code {op_code}")
|
||||||
|
|
||||||
|
|
||||||
|
def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper):
|
||||||
|
pt = int(input("Specify parameter type to set {0: \"uint8\", 1: \"uint16\", 2: \"uint32\", 3: \"float\", "
|
||||||
|
"4: \"double\"}: "))
|
||||||
|
sid = int(input("Specify parameter struct ID to set: "))
|
||||||
|
pid = int(input("Specify parameter ID to set: "))
|
||||||
|
match pt:
|
||||||
|
case 0:
|
||||||
|
param = int(input("Specify parameter value to set: "))
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_scalar_u8_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameter=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
case 1:
|
||||||
|
param = int(input("Specify parameter value to set: "))
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_scalar_u16_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameter=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
case 2:
|
||||||
|
param = int(input("Specify parameter value to set: "))
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_scalar_u32_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameter=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
case 3:
|
||||||
|
param = float(input("Specify parameter value to set: "))
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_scalar_float_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameter=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
case 4:
|
||||||
|
param = float(input("Specify parameter value to set: "))
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_scalar_double_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameter=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper):
|
||||||
|
pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: "))
|
||||||
|
sid = int(input("Specify parameter struct ID to set: "))
|
||||||
|
pid = int(input("Specify parameter ID to set: "))
|
||||||
|
match pt:
|
||||||
|
case 0:
|
||||||
|
nr = int(input("Specify number of elements in vector to set: "))
|
||||||
|
param = []
|
||||||
|
for _ in range(nr):
|
||||||
|
param.append(float(input("Specify parameter vector entry value to set: ")))
|
||||||
|
print(param)
|
||||||
|
if bool(input("Confirm selected parameter values (Y/N): ")):
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_vector_float_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameters=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print("Aborting")
|
||||||
|
return
|
||||||
|
case 1:
|
||||||
|
nr = int(input("Specify number of elements in vector to set: "))
|
||||||
|
param = []
|
||||||
|
for _ in range(nr):
|
||||||
|
param.append(float(input("Specify parameter vector entry value to set: ")))
|
||||||
|
print(param)
|
||||||
|
if bool(input("Confirm selected parameter values (Y/N): ")):
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_load_param_cmd(
|
||||||
|
create_vector_double_parameter(
|
||||||
|
object_id=ACS_CONTROLLER,
|
||||||
|
domain_id=sid,
|
||||||
|
unique_id=pid,
|
||||||
|
parameters=param,
|
||||||
|
).pack()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print("Aborting")
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||||
pw = PrintWrapper(printer)
|
pw = PrintWrapper(printer)
|
||||||
match set_id:
|
match set_id:
|
||||||
@ -724,7 +867,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
|
|||||||
|
|
||||||
|
|
||||||
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
||||||
mekfStatus = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA",
|
mekf_status = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA",
|
||||||
4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"}
|
4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"}
|
||||||
pw.dlog("Received MEKF Set")
|
pw.dlog("Received MEKF Set")
|
||||||
fmt_quat = "!dddd"
|
fmt_quat = "!dddd"
|
||||||
@ -745,7 +888,7 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes):
|
|||||||
current_idx += inc_len_vec
|
current_idx += inc_len_vec
|
||||||
status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0]
|
status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0]
|
||||||
current_idx += inc_len_sts
|
current_idx += inc_len_sts
|
||||||
pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekfStatus[status]}")
|
pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekf_status[status]}")
|
||||||
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
|
pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}")
|
||||||
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}")
|
pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}")
|
||||||
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
|
pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||||
|
Loading…
Reference in New Issue
Block a user