New ACS Stuff #263
@ -65,6 +65,7 @@ class ActionId(enum.IntEnum):
|
|||||||
RESET_MEKF = 1
|
RESET_MEKF = 1
|
||||||
RESTORE_MEKF_NONFINITE_RECOVERY = 2
|
RESTORE_MEKF_NONFINITE_RECOVERY = 2
|
||||||
UPDATE_TLE = 3
|
UPDATE_TLE = 3
|
||||||
|
READ_TLE = 4
|
||||||
|
|
||||||
|
|
||||||
CTRL_STRAT_DICT = {
|
CTRL_STRAT_DICT = {
|
||||||
@ -110,6 +111,7 @@ class OpCodes:
|
|||||||
RESET_MEKF = ["reset_mekf"]
|
RESET_MEKF = ["reset_mekf"]
|
||||||
RESTORE_MEKF_NONFINITE_RECOVERY = ["restore_mekf_nonfinite_recovery"]
|
RESTORE_MEKF_NONFINITE_RECOVERY = ["restore_mekf_nonfinite_recovery"]
|
||||||
UPDATE_TLE = ["update_tle"]
|
UPDATE_TLE = ["update_tle"]
|
||||||
|
READ_TLE = ["read_tle"]
|
||||||
SET_PARAMETER_SCALAR = ["set_scalar_param"]
|
SET_PARAMETER_SCALAR = ["set_scalar_param"]
|
||||||
SET_PARAMETER_VECTOR = ["set_vector_param"]
|
SET_PARAMETER_VECTOR = ["set_vector_param"]
|
||||||
SET_PARAMETER_MATRIX = ["set_matrix_param"]
|
SET_PARAMETER_MATRIX = ["set_matrix_param"]
|
||||||
@ -161,6 +163,7 @@ class Info:
|
|||||||
RESET_MEKF = "Reset the MEKF"
|
RESET_MEKF = "Reset the MEKF"
|
||||||
RESTORE_MEKF_NONFINITE_RECOVERY = "Restore MEKF non-finite recovery"
|
RESTORE_MEKF_NONFINITE_RECOVERY = "Restore MEKF non-finite recovery"
|
||||||
UPDATE_TLE = "Update TLE"
|
UPDATE_TLE = "Update TLE"
|
||||||
|
READ_TLE = "Read the currently stored TLE"
|
||||||
SET_PARAMETER_SCALAR = "Set Scalar Parameter"
|
SET_PARAMETER_SCALAR = "Set Scalar Parameter"
|
||||||
SET_PARAMETER_VECTOR = "Set Vector Parameter"
|
SET_PARAMETER_VECTOR = "Set Vector Parameter"
|
||||||
SET_PARAMETER_MATRIX = "Set Matrix Parameter"
|
SET_PARAMETER_MATRIX = "Set Matrix Parameter"
|
||||||
@ -229,6 +232,7 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper):
|
|||||||
info=Info.RESTORE_MEKF_NONFINITE_RECOVERY,
|
info=Info.RESTORE_MEKF_NONFINITE_RECOVERY,
|
||||||
)
|
)
|
||||||
oce.add(keys=OpCodes.UPDATE_TLE, info=Info.UPDATE_TLE)
|
oce.add(keys=OpCodes.UPDATE_TLE, info=Info.UPDATE_TLE)
|
||||||
|
oce.add(keys=OpCodes.READ_TLE, info=Info.READ_TLE)
|
||||||
oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR)
|
oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR)
|
||||||
oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR)
|
oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR)
|
||||||
oce.add(keys=OpCodes.SET_PARAMETER_MATRIX, info=Info.SET_PARAMETER_MATRIX)
|
oce.add(keys=OpCodes.SET_PARAMETER_MATRIX, info=Info.SET_PARAMETER_MATRIX)
|
||||||
@ -333,6 +337,9 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): # noqa C901
|
|||||||
print("The line does not have the required length of 69 characters")
|
print("The line does not have the required length of 69 characters")
|
||||||
tle = line1.encode() + line2.encode()
|
tle = line1.encode() + line2.encode()
|
||||||
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.UPDATE_TLE, tle))
|
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.UPDATE_TLE, tle))
|
||||||
|
elif op_code in OpCodes.READ_TLE:
|
||||||
|
q.add_log_cmd(f"{Info.READ_TLE}")
|
||||||
|
q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.READ_TLE))
|
||||||
elif op_code in OpCodes.SET_PARAMETER_SCALAR:
|
elif op_code in OpCodes.SET_PARAMETER_SCALAR:
|
||||||
q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}")
|
q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}")
|
||||||
set_acs_ctrl_param_scalar(q)
|
set_acs_ctrl_param_scalar(q)
|
||||||
@ -1247,6 +1254,22 @@ def handle_fused_rot_rate_data(pw: PrintWrapper, hk_data: bytes):
|
|||||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_acs_ctrl_action_replies(
|
||||||
|
action_id: int, pw: PrintWrapper, custom_data: bytes
|
||||||
|
):
|
||||||
|
if action_id == ActionId.READ_TLE:
|
||||||
|
handle_read_tle(pw, custom_data)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_read_tle(pw: PrintWrapper, custom_data: bytes):
|
||||||
|
pw.dlog("Received TLE")
|
||||||
|
data_length = 69 * 2
|
||||||
|
if len(custom_data) != data_length:
|
||||||
|
raise ValueError(f"Received data of unexpected length {len(custom_data)}")
|
||||||
|
tle = custom_data.decode()
|
||||||
|
pw.dlog(f"{tle[0:69]}\n{tle[69:69*2]}")
|
||||||
|
|
||||||
|
|
||||||
def perform_mgm_calibration( # noqa C901: Complexity okay
|
def perform_mgm_calibration( # noqa C901: Complexity okay
|
||||||
pw: PrintWrapper, mgm_tuple: Tuple
|
pw: PrintWrapper, mgm_tuple: Tuple
|
||||||
): # noqa C901: Complexity okay
|
): # noqa C901: Complexity okay
|
||||||
|
Loading…
Reference in New Issue
Block a user