diff --git a/CHANGELOG.md b/CHANGELOG.md index 15d8024..729c27c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,16 @@ list yields a list of all related PRs for each release. # [unreleased] +## Added + +- Added ACS action cmds +- Added new ACS hk values +- Added ACS set parameter cmds + ## Fixed - Correction for ACS CTRL raw data requests HK type +- Fixed diag related ACS hk cmds ## Added @@ -316,4 +323,4 @@ tmtccmd v4.0.0rc0 - Extended heater commands for more informative output which component is heated See [milestones](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/milestones) -and [releases](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/releases) \ No newline at end of file +and [releases](https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/releases) diff --git a/eive_tmtc/tmtc/acs/acs_ctrl.py b/eive_tmtc/tmtc/acs/acs_ctrl.py index c7717ac..9df6fcf 100644 --- a/eive_tmtc/tmtc/acs/acs_ctrl.py +++ b/eive_tmtc/tmtc/acs/acs_ctrl.py @@ -14,6 +14,7 @@ from tmtccmd.config.tmtc import ( OpCodeEntry, ) from tmtccmd.tc import service_provider +from tmtccmd.tc.queue import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_mode import Mode, pack_mode_command from tmtccmd.tc.decorator import ServiceProviderParams from tmtccmd.tc.pus_3_fsfw_hk import ( @@ -23,8 +24,25 @@ from tmtccmd.tc.pus_3_fsfw_hk import ( disable_periodic_hk_command, create_request_one_diag_command, ) +from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter +from tmtccmd.tc.pus_20_fsfw_param import ( + create_load_param_cmd +) + +from tmtccmd.pus.s20_fsfw_param_defs import ( + create_scalar_u8_parameter, + create_scalar_u16_parameter, + create_scalar_i32_parameter, + create_scalar_float_parameter, + create_scalar_double_parameter, + create_vector_float_parameter, + create_vector_double_parameter, + create_matrix_float_parameter, + create_matrix_double_parameter, +) + class SetId(enum.IntEnum): MGM_RAW_SET = 0 @@ -49,12 +67,24 @@ class Submode(enum.IntEnum): PTG_TARGET_GS = 15 PTG_INERTIAL = 16 +class ActionId(enum.IntEnum): + SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL = 0 + RESET_MEKF = 1 class OpCodes: OFF = ["off"] SAFE = ["normal_safe"] DTBL = ["normal_detumble"] IDLE = ["normal_idle"] + NADIR = ["normal_nadir"] + TARGET = ["normal_target"] + GS = ["normal_gs"] + INERTIAL = ["normal_inertial"] + SAFE_PTG = ["confirm_deployment"] + RESET_MEKF = ["reset_mekf"] + SET_PARAMETER_SCALAR = ["set_scalar_param"] + SET_PARAMETER_VECTOR = ["set_vector_param"] + SET_PARAMETER_MATRIX = ["set_matrix_param"] REQUEST_RAW_MGM_HK = ["0", "mgm_raw_hk"] ENABLE_RAW_MGM_HK = ["1", "mgm_raw_enable_hk"] DISABLE_RAW_MGM_HK = ["2", "mgm_raw_disable_hk"] @@ -89,9 +119,18 @@ class OpCodes: class Info: OFF = "Switch ACS CTRL off" - SAFE = "Switch ACS CTRL normal safe" - DTBL = "Switch ACS CTRL normal detumble" - IDLE = "Switch ACS CTRL normal idle" + SAFE = "Switch ACS CTRL normal - safe" + DTBL = "Switch ACS CTRL normal - detumble" + IDLE = "Switch ACS CTRL normal - idle" + NADIR = "Switch ACS CTRL normal - pointing nadir" + TARGET = "Switch ACS CTRL normal - pointing target" + GS = "Switch ACS CTRL normal - pointing target groundstation" + INERTIAL = "Switch ACS CTRL normal - pointing inertial" + SAFE_PTG = "Confirm deployment of both solar arrays" + RESET_MEKF = "Reset the MEKF" + SET_PARAMETER_SCALAR = "Set Scalar Parameter" + SET_PARAMETER_VECTOR = "Set Vector Parameter" + SET_PARAMETER_MATRIX = "Set Matrix Parameter" REQUEST_RAW_MGM_HK = "Request Raw MGM HK once" ENABLE_RAW_MGM_HK = "Enable Raw MGM HK data generation" DISABLE_RAW_MGM_HK = "Disable Raw MGM HK data generation" @@ -143,6 +182,15 @@ def acs_cmd_defs(defs: TmtcDefinitionWrapper): oce.add(keys=OpCodes.SAFE, info=Info.SAFE) oce.add(keys=OpCodes.DTBL, info=Info.DTBL) oce.add(keys=OpCodes.IDLE, info=Info.IDLE) + oce.add(keys=OpCodes.NADIR, info=Info.NADIR) + oce.add(keys=OpCodes.TARGET, info=Info.TARGET) + oce.add(keys=OpCodes.GS, info=Info.GS) + oce.add(keys=OpCodes.INERTIAL, info=Info.INERTIAL) + oce.add(keys=OpCodes.SAFE_PTG, info=Info.SAFE_PTG) + oce.add(keys=OpCodes.RESET_MEKF, info=Info.RESET_MEKF) + oce.add(keys=OpCodes.SET_PARAMETER_SCALAR, info=Info.SET_PARAMETER_SCALAR) + oce.add(keys=OpCodes.SET_PARAMETER_VECTOR, info=Info.SET_PARAMETER_VECTOR) + oce.add(keys=OpCodes.SET_PARAMETER_MATRIX, info=Info.SET_PARAMETER_MATRIX) oce.add(keys=OpCodes.REQUEST_RAW_MGM_HK, info=Info.REQUEST_RAW_MGM_HK) oce.add(keys=OpCodes.ENABLE_RAW_MGM_HK, info=Info.ENABLE_RAW_MGM_HK) oce.add(keys=OpCodes.DISABLE_RAW_MGM_HK, info=Info.DISABLE_RAW_MGM_HK) @@ -194,6 +242,33 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): elif op_code in OpCodes.IDLE: q.add_log_cmd(f"{Info.IDLE}") q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.IDLE)) + elif op_code in OpCodes.NADIR: + q.add_log_cmd(f"{Info.NADIR}") + q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_NADIR)) + elif op_code in OpCodes.TARGET: + q.add_log_cmd(f"{Info.TARGET}") + q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET)) + elif op_code in OpCodes.GS: + q.add_log_cmd(f"{Info.GS}") + q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_TARGET_GS)) + elif op_code in OpCodes.INERTIAL: + q.add_log_cmd(f"{Info.INERTIAL}") + q.add_pus_tc(pack_mode_command(ACS_CONTROLLER, Mode.NORMAL, Submode.PTG_INERTIAL)) + elif op_code in OpCodes.SAFE_PTG: + q.add_log_cmd(f"{Info.SAFE_PTG}") + q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.SOLAR_ARRAY_DEPLOYMENT_SUCCESSFUL)) + elif op_code in OpCodes.RESET_MEKF: + q.add_log_cmd(f"{Info.RESET_MEKF}") + q.add_pus_tc(create_action_cmd(ACS_CONTROLLER, ActionId.RESET_MEKF)) + elif op_code in OpCodes.SET_PARAMETER_SCALAR: + q.add_log_cmd(f"{Info.SET_PARAMETER_SCALAR}") + set_acs_ctrl_param_scalar(q) + elif op_code in OpCodes.SET_PARAMETER_VECTOR: + q.add_log_cmd(f"{Info.SET_PARAMETER_VECTOR}") + set_acs_ctrl_param_vector(q) + elif op_code in OpCodes.SET_PARAMETER_MATRIX: + q.add_log_cmd(f"{Info.SET_PARAMETER_MATRIX}") + set_acs_ctrl_param_matrix(q) elif op_code in OpCodes.REQUEST_RAW_MGM_HK: q.add_log_cmd(Info.REQUEST_RAW_MGM_HK) q.add_pus_tc( @@ -299,7 +374,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): elif op_code in OpCodes.ENABLE_PROC_GYR_HK: q.add_log_cmd(Info.ENABLE_PROC_GYR_HK) cmd_tuple = enable_periodic_hk_command_with_interval( - False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0 + True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET), 2.0 ) q.add_pus_tc(cmd_tuple[0]) q.add_pus_tc(cmd_tuple[1]) @@ -307,7 +382,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): q.add_log_cmd(Info.DISABLE_PROC_GYR_HK) q.add_pus_tc( disable_periodic_hk_command( - False, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET) + True, make_sid(ACS_CONTROLLER, SetId.GYR_PROC_SET) ) ) elif op_code in OpCodes.REQUEST_PROC_GPS_HK: @@ -331,7 +406,7 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): ) elif op_code in OpCodes.REQUEST_MEKF_HK: q.add_log_cmd(Info.REQUEST_MEKF_HK) - q.add_pus_tc(generate_one_hk_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA))) + q.add_pus_tc(create_request_one_diag_command(make_sid(ACS_CONTROLLER, SetId.MEKF_DATA))) elif op_code in OpCodes.ENABLE_MEKF_HK: q.add_log_cmd(Info.ENABLE_MEKF_HK) cmd_tuple = enable_periodic_hk_command_with_interval( @@ -386,6 +461,177 @@ def pack_acs_ctrl_command(p: ServiceProviderParams): logging.getLogger(__name__).info(f"Unknown op code {op_code}") +def set_acs_ctrl_param_scalar(q: DefaultPusQueueHelper): + pt = int(input("Specify parameter type to set {0: \"uint8\", 1: \"uint16\", 2: \"int32\", 3: \"float\", " + "4: \"double\"}: ")) + sid = int(input("Specify parameter struct ID to set: ")) + pid = int(input("Specify parameter ID to set: ")) + match pt: + case 0: + param = int(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_u8_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 1: + param = int(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_u16_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 2: + param = int(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_i32_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 3: + param = float(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_float_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + case 4: + param = float(input("Specify parameter value to set: ")) + q.add_pus_tc( + create_load_param_cmd( + create_scalar_double_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameter=param, + ).pack() + ) + ) + + +def set_acs_ctrl_param_vector(q: DefaultPusQueueHelper): + pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: ")) + sid = int(input("Specify parameter struct ID to set: ")) + pid = int(input("Specify parameter ID to set: ")) + match pt: + case 0: + elms = int(input("Specify number of elements in vector to set: ")) + param = [] + for _ in range(elms): + param.append(float(input("Specify parameter vector entry value to set: "))) + print(param) + if input("Confirm selected parameter values (Y/N): ") == "Y": + q.add_pus_tc( + create_load_param_cmd( + create_vector_float_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameters=param, + ).pack() + ) + ) + else: + q.add_log_cmd("Aborted by user input") + return + case 1: + elms = int(input("Specify number of elements in vector to set: ")) + param = [] + for _ in range(elms): + param.append(float(input("Specify parameter vector entry value to set: "))) + print(param) + if input("Confirm selected parameter values (Y/N): ") == "Y": + q.add_pus_tc( + create_load_param_cmd( + create_vector_double_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameters=param, + ).pack() + ) + ) + else: + q.add_log_cmd("Aborted by user input") + return + + +def set_acs_ctrl_param_matrix(q: DefaultPusQueueHelper): + pt = int(input("Specify parameter type to set {0: \"float\", 1: \"double\"}: ")) + sid = int(input("Specify parameter struct ID to set: ")) + pid = int(input("Specify parameter ID to set: ")) + match pt: + case 0: + rows = int(input("Specify number of rows in matrix to set: ")) + cols = int(input("Specify number of columns in matrix to set: ")) + row = [] + param = [] + for _ in range(rows): + for _ in range(cols): + row.append(float(input("Specify parameter vector entry value to set: "))) + param.append(row) + print(param) + if input("Confirm selected parameter values (Y/N): ") == "Y": + q.add_pus_tc( + create_load_param_cmd( + create_matrix_float_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameters=param, + ).pack() + ) + ) + else: + q.add_log_cmd("Aborted by user input") + return + case 1: + rows = int(input("Specify number of rows in matrix to set: ")) + cols = int(input("Specify number of columns in matrix to set: ")) + row = [] + param = [] + for _ in range(rows): + for _ in range(cols): + row.append(float(input("Specify parameter vector entry value to set: "))) + param.append(row) + row = [] + print(param) + if input("Confirm selected parameter values (Y/N): ") == "Y": + q.add_pus_tc( + create_load_param_cmd( + create_matrix_double_parameter( + object_id=ACS_CONTROLLER, + domain_id=sid, + unique_id=pid, + parameters=param, + ).pack() + ) + ) + else: + q.add_log_cmd("Aborted by user input") + return + + def handle_acs_ctrl_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): pw = PrintWrapper(printer) match set_id: @@ -684,14 +930,18 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_mekf_data(pw: PrintWrapper, hk_data: bytes): + mekf_status = {0 : "UNINITIALIZED", 1: "NO_GYR_DATA", 2: "NO_MODEL_VECTORS", 3: "NO_SUS_MGM_STR_DATA", + 4: "COVARIANCE_INVERSION_FAILED", 10: "INITIALIZED", 11: "RUNNING"} pw.dlog("Received MEKF Set") fmt_quat = "!dddd" fmt_str_4 = "[{:8.3f}, {:8.3f}, {:8.3f}, {:8.3f}]" fmt_str_3 = "[{:8.3f}, {:8.3f}, {:8.3f}]" fmt_vec = "!ddd" + fmt_sts = "!B" inc_len_quat = struct.calcsize(fmt_quat) inc_len_vec = struct.calcsize(fmt_vec) - if len(hk_data) < inc_len_quat + inc_len_vec: + inc_len_sts = struct.calcsize(fmt_sts) + if len(hk_data) < inc_len_quat + inc_len_vec + inc_len_sts: pw.dlog("Received HK set too small") return current_idx = 0 @@ -699,18 +949,23 @@ def handle_mekf_data(pw: PrintWrapper, hk_data: bytes): current_idx += inc_len_quat rate = struct.unpack(fmt_vec, hk_data[current_idx : current_idx + inc_len_vec]) current_idx += inc_len_vec + status = struct.unpack(fmt_sts, hk_data[current_idx : current_idx + inc_len_sts])[0] + current_idx += inc_len_sts + pw.dlog(f"{'MEKF Status'.ljust(25)}: {mekf_status[status]}") pw.dlog(f"{'MEKF Quaternion'.ljust(25)}: {fmt_str_4.format(*quat)}") pw.dlog(f"{'MEKF Rotational Rate'.ljust(25)}: {fmt_str_3.format(*rate)}") - pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=2) + pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3) def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes): pw.dlog("Received CTRL Values Set") fmt_quat = "!dddd" fmt_scalar = "!d" + fmt_vec = "!ddd" inc_len_quat = struct.calcsize(fmt_quat) inc_len_scalar = struct.calcsize(fmt_scalar) - if len(hk_data) < 2 * inc_len_quat + inc_len_scalar: + inc_len_vec = struct.calcsize(fmt_vec) + if len(hk_data) < 2 * inc_len_quat + inc_len_scalar + inc_len_vec: pw.dlog("Received HK set too small") return current_idx = 0 @@ -735,10 +990,18 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes): ) ] current_idx += inc_len_scalar + tgt_rot = [ + f"{val:8.3f}" + for val in struct.unpack( + fmt_vec, hk_data[current_idx : current_idx + inc_len_vec] + ) + ] + current_idx += inc_len_vec pw.dlog(f"Control Values Target Quaternion: {tgt_quat}") pw.dlog(f"Control Values Error Quaternion: {err_quat}") pw.dlog(f"Control Values Error Angle: {err_ang} [rad]") - pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=3) + pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [rad/s]") + pw.printer.print_validity_buffer(hk_data[current_idx:], num_vars=4) def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):