import enum import struct from typing import List from eive_tmtc.gomspace.gomspace_common import ( pack_set_u8_param_command, Channel, GomspaceDeviceActionId, prompt_and_pack_set_integer_param_command, prompt_and_pack_get_param_command, pack_request_config_command, pack_gnd_wdt_reset_command, ParamTypes, pack_reboot_command, ) from eive_tmtc.gomspace.gomspace_pdu_definitions import OUT_ENABLE_LIST from spacepackets.ecss import PusTelecommand from tmtccmd.config import OpCodeEntry from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_3_fsfw_hk import ( make_sid, generate_one_diag_command, generate_one_hk_command, enable_periodic_hk_command_with_interval, disable_periodic_hk_command, ) from tmtccmd.pus.s8_fsfw_funccmd import create_action_cmd from tmtccmd.util import ObjectIdU32, ObjectIdBase class GomspaceOpCode: GET_PARAM = ["get_param"] SET_INTEGER_PARAM = ["set_int_param"] SAVE_TABLE = ["save_table"] RESET_GND_WATCHDOG = ["reset_gnd_wdt"] SAVE_TABLE_DEFAULT = ["save_table_default"] LOAD_TABLE = ["load_table"] REBOOT = ["reboot"] REQUEST_CONFIG_TABLE = ["cfg_table"] class GsInfo: GET_PARAMETER = "Get parameter" SET_PARAMETER = "Set integer parameter" REQUEST_CONFIG_TABLE = "Request Config Table" RESET_GND_WATCHDOG = "Reset GND watchdog" SAVE_TABLE = "Save table non-volatile (file)" SAVE_TABLE_DEFAULT = "Save table non-volatile (default)" LOAD_TABLE = "Load Table" REBOOT = "Reboot PCDU module" class PowerInfo: INFO_CORE = "Core Information" INFO_AUX = "Auxiliary Information" SWITCHER_HK = "Switcher State Information" INFO_ALL = "All Information" REQUEST_SWITCHER_SET = "Request Switcher Information" ENABLE_INFO_HK = "Enable Core Info HK" DISABLE_INFO_HK = "Disable Core Info HK" RESET_ALL_GND_WDTS = "Reset all Ground Watchdogs" REQUEST_CORE_HK_ONCE = "Requesting Core HK once" REQUEST_AUX_HK_ONCE = "Requesting Aux HK once" PRINT_SWITCH_V_I = "Print Switch V I Info" PRINT_LATCHUPS = "Print latchups" class PowerOpCodes: # PDU 1 TCS_ON = ["tcs_on"] TCS_OFF = ["tcs_off"] SYRLINKS_ON = ["syrlinks_on"] SYRLINKS_OFF = ["syrlinks_off"] STAR_TRACKER_ON = ["str_on"] STAR_TRACKER_OFF = ["str_off"] MGT_ON = ["mgt_on"] MGT_OFF = ["mgt_off"] SUS_N_ON = ["sus_nom_on"] SUS_N_OFF = ["sus_nom_off"] SCEX_ON = ["scex_on"] SCEX_OFF = ["scex_off"] PLOC_ON = ["ploc_on"] PLOC_OFF = ["ploc_off"] ACS_A_ON = ["acs_a_on"] ACS_A_OFF = ["acs_a_off"] # PDU 2 PL_PCDU_VBAT_NOM_ON = ["plpcdu_vbat_nom_on"] PL_PCDU_VBAT_NOM_OFF = ["plpcdu_vbat_nom_off"] RW_ON = ["rw_on"] RW_OFF = ["rw_off"] HEATER_ON = ["heater_on"] HEATER_OFF = ["heater_off"] SUS_R_ON = ["sus_red_on"] SUS_R_OFF = ["sus_red_off"] SOLAR_ARRAY_DEPL_ON = ["sa_depl_on"] SOLAR_ARRAY_DEPL_OFF = ["sa_depl_off"] PL_PCDU_VBAT_RED_ON = ["plpcdu_vbat_red_on"] PL_PCDU_VBAT_RED_OFF = ["plpcdu_vbat_red_off"] ACS_B_ON = ["acs_b_on"] ACS_B_OFF = ["acs_b_off"] PL_CAM_ON = ["cam_on"] PL_CAM_OFF = ["cam_off"] REBOOT = ["reboot"] INFO_CORE = ["info"] SWITCHER_HK = ["switcher_states"] ENABLE_INFO_HK = ["info_hk_on"] DISABLE_INFO_HK = ["info_hk_off"] INFO_AUX = ["info_aux"] INFO_ALL = ["info_all"] RESET_ALL_GND_WDTS = ["reset_gnd_wdts"] # Request HK REQUEST_CORE_HK_ONCE = ["hk_core"] REQUEST_AUX_HK_ONCE = ["hk_aux"] PRINT_SWITCH_V_I = ["print_switch_vi"] PRINT_LATCHUPS = ["print_latchups"] class SetId(enum.IntEnum): CORE = 1 AUX = 2 CONFIG = 3 def pack_common_power_cmds( prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str ): objb = object_id.as_bytes if op_code in PowerOpCodes.ENABLE_INFO_HK: interval = float(input("Specify HK interval in floating point seconds: ")) q.add_log_cmd(f"{prefix}: {PowerInfo.ENABLE_INFO_HK} with interval {interval}") cmds = enable_periodic_hk_command_with_interval( True, make_sid(objb, SetId.CORE), interval ) for cmd in cmds: q.add_pus_tc(cmd) if op_code in PowerOpCodes.DISABLE_INFO_HK: q.add_log_cmd(f"{prefix}: {PowerInfo.DISABLE_INFO_HK}") q.add_pus_tc(disable_periodic_hk_command(True, make_sid(objb, SetId.CORE))) def pack_common_gomspace_cmds( # noqa C901: Complexity is okay here. prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str ): objb = object_id.as_bytes if op_code in PowerOpCodes.PRINT_SWITCH_V_I: q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_SWITCH_V_I}") q.add_pus_tc( create_action_cmd( object_id=objb, action_id=GomspaceDeviceActionId.PRINT_SWITCH_V_I ) ) if op_code in PowerOpCodes.REBOOT: q.add_log_cmd(f"{prefix}: {GsInfo.REBOOT}") q.add_pus_tc(pack_reboot_command(object_id)) if op_code in PowerOpCodes.PRINT_LATCHUPS: q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_LATCHUPS}") q.add_pus_tc( create_action_cmd( object_id=objb, action_id=GomspaceDeviceActionId.PRINT_LATCHUPS ) ) if op_code in GomspaceOpCode.SET_INTEGER_PARAM: q.add_log_cmd(f"{prefix}: {GsInfo.SET_PARAMETER}") print("Please specify the parameter type from index") for idx, v in enumerate(ParamTypes): print(f"{idx}: {v.name}") ptype = int(input("Index: ")) prompt_and_pack_set_integer_param_command(q, object_id, ParamTypes(ptype)) if op_code in GomspaceOpCode.GET_PARAM: q.add_log_cmd(f"{prefix}: {GsInfo.GET_PARAMETER}") prompt_and_pack_get_param_command(q, object_id) if op_code in GomspaceOpCode.REQUEST_CONFIG_TABLE: q.add_log_cmd(f"{prefix}: {GsInfo.REQUEST_CONFIG_TABLE}") q.add_pus_tc(pack_request_config_command(object_id.as_bytes)) if op_code in GomspaceOpCode.SAVE_TABLE: q.add_log_cmd(f"{prefix}: {GsInfo.SAVE_TABLE}") source_table = int( input( "Source table [0: Board Config, 1: Module Config, " "2: Calibration Parameter, 4: TM Data]: " ) ) if source_table not in [0, 1, 2, 4]: raise ValueError("Invalid source table index") # Not used for now """ target_table = int(input( "Target table. [Default: Source table]: " )) """ q.add_pus_tc( create_action_cmd( object_id=object_id.as_bytes, action_id=GomspaceDeviceActionId.SAVE_TABLE, user_data=bytes([source_table]), ) ) if op_code in GomspaceOpCode.SAVE_TABLE_DEFAULT: source_table = int( input( "Source table [0: Board Config, 1: Module Config, 2: Calibration Parameter]: " ) ) if source_table not in [0, 1, 2]: raise ValueError("Invalid source table index") q.add_pus_tc( create_action_cmd( object_id=object_id.as_bytes, action_id=GomspaceDeviceActionId.SAVE_TABLE_DEFAULT, user_data=bytes([source_table]), ) ) if op_code in GomspaceOpCode.LOAD_TABLE: target_table = int( input( "Target table ID [0: Board Config, 1: Module Config, 2: Calibration Parameter, " "4: HK TM]: " ) ) if target_table not in [0, 1, 2, 4]: raise ValueError("Invalid source table index") if target_table != 4: source_table = int( input( "Source table (file or default) [0: Board Config, 1: Module Config, " "2: Calibration Parameter, value + 4 for default table]: " ) ) if source_table not in [0, 1, 2, 4, 5, 6]: raise ValueError("Invalid source table index") else: # Will be ignored source_table = 4 q.add_pus_tc( create_action_cmd( object_id=object_id.as_bytes, action_id=GomspaceDeviceActionId.LOAD_TABLE, user_data=bytes([source_table, target_table]), ) ) if op_code in GomspaceOpCode.RESET_GND_WATCHDOG: q.add_log_cmd(f"{prefix}: {GsInfo.RESET_GND_WATCHDOG}") q.add_pus_tc(pack_gnd_wdt_reset_command(object_id)) def pack_reset_gnd_wdt_cmd( q: DefaultPusQueueHelper, prefix: str, object_id: ObjectIdBase ): q.add_log_cmd(f"{prefix}: {GsInfo.RESET_GND_WATCHDOG}") q.add_pus_tc(pack_gnd_wdt_reset_command(object_id)) def req_hk_cmds( prefix: str, q: DefaultPusQueueHelper, op_code: str, obj_id: bytes, set_id_pair: [int, int], ): if op_code in PowerOpCodes.REQUEST_CORE_HK_ONCE: q.add_log_cmd(f"{prefix}: {PowerInfo.REQUEST_CORE_HK_ONCE}") hk_sid = make_sid(object_id=obj_id, set_id=set_id_pair[0]) q.add_pus_tc(generate_one_diag_command(sid=hk_sid)) if op_code in PowerOpCodes.REQUEST_AUX_HK_ONCE: q.add_log_cmd(f"{prefix}: {PowerInfo.REQUEST_AUX_HK_ONCE}") hk_sid = make_sid(object_id=obj_id, set_id=set_id_pair[1]) q.add_pus_tc(generate_one_hk_command(sid=hk_sid)) def pack_pdu_disable_safe_off_cmd() -> PusTelecommand: pass def generic_on_cmd( object_id: bytes, q: DefaultPusQueueHelper, info_str: str, out_idx: int ): q.add_log_cmd(info_str + " on") q.add_pus_tc(create_generic_on_cmd(object_id, out_idx)) def create_generic_on_cmd(object_id: bytes, out_idx: int): return pack_set_u8_param_command( object_id, OUT_ENABLE_LIST[out_idx].parameter_address, Channel.on, ) def generic_off_cmd( object_id: bytes, q: DefaultPusQueueHelper, info_str: str, out_idx: int ): q.add_log_cmd(info_str + " off") q.add_pus_tc(create_generic_off_cmd(object_id, out_idx)) def create_generic_off_cmd(object_id: bytes, out_idx: int): return pack_set_u8_param_command( object_id, OUT_ENABLE_LIST[out_idx].parameter_address, Channel.off, ) def add_common_power_defs(oce: OpCodeEntry): oce.add(keys=PowerOpCodes.REQUEST_CORE_HK_ONCE, info=PowerInfo.REQUEST_CORE_HK_ONCE) oce.add(keys=PowerOpCodes.REQUEST_AUX_HK_ONCE, info=PowerInfo.REQUEST_AUX_HK_ONCE) oce.add(keys=PowerOpCodes.ENABLE_INFO_HK, info=PowerInfo.ENABLE_INFO_HK) oce.add(keys=PowerOpCodes.DISABLE_INFO_HK, info=PowerInfo.DISABLE_INFO_HK) def add_gomspace_cmd_defs(oce: OpCodeEntry): oce.add( keys=PowerOpCodes.REQUEST_CORE_HK_ONCE, info=PowerInfo.REQUEST_CORE_HK_ONCE, ) oce.add( keys=PowerOpCodes.REQUEST_AUX_HK_ONCE, info=PowerInfo.REQUEST_AUX_HK_ONCE, ) oce.add(keys=PowerOpCodes.PRINT_LATCHUPS, info=PowerInfo.PRINT_LATCHUPS) oce.add(keys=GomspaceOpCode.GET_PARAM, info=GsInfo.GET_PARAMETER) oce.add(keys=GomspaceOpCode.REBOOT, info=GsInfo.REBOOT) oce.add(keys=GomspaceOpCode.SET_INTEGER_PARAM, info=GsInfo.SET_PARAMETER) oce.add(keys=GomspaceOpCode.REQUEST_CONFIG_TABLE, info=GsInfo.REQUEST_CONFIG_TABLE) oce.add(keys=GomspaceOpCode.SAVE_TABLE, info=GsInfo.SAVE_TABLE) oce.add(keys=GomspaceOpCode.SAVE_TABLE_DEFAULT, info=GsInfo.SAVE_TABLE_DEFAULT) oce.add(keys=GomspaceOpCode.LOAD_TABLE, info=GsInfo.LOAD_TABLE) oce.add(keys=GomspaceOpCode.RESET_GND_WATCHDOG, info=GsInfo.RESET_GND_WATCHDOG) OBC_ENDIANNESS = "<" def unpack_array_in_data( data: bytes, start_addr: int, width: int, entries: int, struct_spec: str ) -> List: return [ struct.unpack( f"{OBC_ENDIANNESS}{struct_spec}", data[start_addr + (i * width) : start_addr + ((i + 1) * width)], )[0] for i in range(entries) ]