diff --git a/eive_tmtc/tmtc/acs/acs_ctrl.py b/eive_tmtc/tmtc/acs/acs_ctrl.py index 16e398a..2c19d68 100644 --- a/eive_tmtc/tmtc/acs/acs_ctrl.py +++ b/eive_tmtc/tmtc/acs/acs_ctrl.py @@ -14,6 +14,7 @@ from tmtccmd.config.tmtc import ( OpCodeEntry, ) from tmtccmd.tc import service_provider +from tmtccmd.tc.queue import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.pus_3_fsfw_hk import ( @@ -26,6 +27,20 @@ from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter +from tmtccmd.tc.pus_20_fsfw_param import ( + create_load_param_cmd +) + +from tmtccmd.pus.s20_fsfw_param_defs import ( + create_scalar_u8_parameter, + create_scalar_u16_parameter, + create_scalar_u32_parameter, + create_scalar_float_parameter, + create_scalar_double_parameter, + create_vector_float_parameter, + create_vector_double_parameter, +) + class SetId(enum.IntEnum): MGM_RAW_SET = 0 @@ -65,6 +80,8 @@ class OpCodes: INERTIAL = ["normal_inertial"] SAFE_PTG = ["confirm_deployment"] RESET_MEKF = ["reset_mekf"] + SET_PARAMETER_SCALAR = ["set_scalar_param"] + SET_PARAMETER_VECTOR = ["set_vector_param"] REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"] ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"] DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"] @@ -108,6 +125,9 @@ class Info: INERTIAL = "Switch ACS CTRL normal - pointing inertial" SAFE_PTG = "Confirm deployment of both solar arrays" RESET_MEKF = "Reset the MEKF" + SET_PARAMETER_SCALAR = "Set Scalar Parameter" + SET_PARAMETER_VECTOR = "Set Vector Parameter" + SET_PARAMETER_MATRIX = "Set Matrix Parameter" REQUEST_RAW_MGM_HK = "Request Raw MGM HK once" ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation" DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation" @@ -165,6 +185,8 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper): oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL) oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG) oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF) + oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR) + oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR) oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK) oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK) oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK) @@ -234,6 +256,12 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): elif op_code in OpCodes.RESET_MEKF: q.add_log_cmd(f"{Info.RESET_MEKF}") q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF)) + elif op_code in OpCodes.SET_PARAMETER_SCALAR: + q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}") + set_acs_ctrl_param_scalar(q) + elif op_code in OpCodes.SET_PARAMETER_VECTOR: + q.add_log_cmd(f"{Info.SET_PARAMETER_VECTOR}") + set_acs_ctrl_param_vector(q) elif op_code in OpCodes.REQUEST_RAW_MGM_HK: q.add_log_cmd(Info.REQUEST_RAW_MGM_HK) q.add_pus_tc( @@ -426,6 +454,121 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): logging.getLogger(__name__).info(f"Unknown op code {op_code}") +def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper): + pt = int(input("Specify parameter type to set {0: \"uint8\", 1: \"uint16\", 2: \"uint32\", 3: \"float\", " + "4: \"double\"}: ")) + sid = int(input("Specify parameter struct ID to set: ")) + pid = int(input("Specify parameter ID to set: ")) + match pt: + case 0: + param = int(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_u8_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 1: + param = int(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_u16_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 2: + param = int(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_u32_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 3: + param = float(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 4: + param = float(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_double_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + + +def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper): + pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: ")) + sid = int(input("Specify parameter struct ID to set: ")) + pid = int(input("Specify parameter ID to set: ")) + match pt: + case 0: + nr = int(input("Specify number of elements in vector to set: ")) + param = [] + for _ in range(nr): + param.append(float(input("Specify parameter vector entry value to set: "))) + print(param) + if bool(input("Confirm selected parameter values (Y/N): ")): + q.add_pus_tc( + create_load_param_cmd( + create_vector_float_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameters=param, + ).pack() + ) + ) + else: + print("Aborting") + return + case 1: + nr = int(input("Specify number of elements in vector to set: ")) + param = [] + for _ in range(nr): + param.append(float(input("Specify parameter vector entry value to set: "))) + print(param) + if bool(input("Confirm selected parameter values (Y/N): ")): + q.add_pus_tc( + create_load_param_cmd( + create_vector_double_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameters=param, + ).pack() + ) + ) + else: + print("Aborting") + return + + def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer) match set_id: @@ -724,7 +867,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_mekf_data(pw: PrintWrapper, hk_data: bytes): - mekfStatus = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA", + mekf_status = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA", 4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"} pw.dlog("Received MEKF Set") fmt_quat = "!dddd" @@ -745,7 +888,7 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes): current_idx += inc_len_vec status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0] current_idx += inc_len_sts - pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekfStatus[status]}") + pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekf_status[status]}") pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}") pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}") pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3)