342 lines
12 KiB
Python
342 lines
12 KiB
Python
import struct
|
|
from typing import List
|
|
|
|
from eive_tmtc.gomspace.gomspace_common import (
|
|
pack_set_u8_param_command,
|
|
Channel,
|
|
GomspaceDeviceActionIds,
|
|
prompt_and_pack_set_integer_param_command,
|
|
prompt_and_pack_get_param_command,
|
|
pack_request_config_command,
|
|
pack_gnd_wdt_reset_command,
|
|
ParamTypes,
|
|
pack_reboot_command,
|
|
)
|
|
from eive_tmtc.gomspace.gomspace_pdu_definitions import OUT_ENABLE_LIST
|
|
from spacepackets.ecss import PusTelecommand
|
|
from tmtccmd.config import OpCodeEntry
|
|
from tmtccmd.tc import DefaultPusQueueHelper
|
|
from tmtccmd.tc.pus_3_fsfw_hk import (
|
|
make_sid,
|
|
generate_one_diag_command,
|
|
generate_one_hk_command,
|
|
enable_periodic_hk_command_with_interval,
|
|
disable_periodic_hk_command,
|
|
)
|
|
from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
|
|
from tmtccmd.util import ObjectIdU32, ObjectIdBase
|
|
|
|
|
|
class GomspaceOpCodes:
|
|
GET_PARAM = ["get_param"]
|
|
SET_INTEGER_PARAM = ["set_int_param"]
|
|
SAVE_TABLE = ["save_table"]
|
|
RESET_GND_WATCHDOG = ["reset_gnd_wdt"]
|
|
SAVE_TABLE_DEFAULT = ["save_table_default"]
|
|
LOAD_TABLE = ["load_table"]
|
|
REBOOT = ["reboot"]
|
|
REQUEST_CONFIG_TABLE = ["cfg_table"]
|
|
|
|
|
|
class GsInfo:
|
|
GET_PARAMETER = "Get parameter"
|
|
SET_PARAMETER = "Set integer parameter"
|
|
REQUEST_CONFIG_TABLE = "Request Config Table"
|
|
RESET_GND_WATCHDOG = "Reset GND watchdog"
|
|
SAVE_TABLE = "Save table non-volatile (file)"
|
|
SAVE_TABLE_DEFAULT = "Save table non-volatile (default)"
|
|
LOAD_TABLE = "Load Table"
|
|
REBOOT = "Reboot PCDU module"
|
|
|
|
|
|
class PowerInfo:
|
|
INFO_CORE = "Core Information"
|
|
INFO_AUX = "Auxiliary Information"
|
|
INFO_ALL = "All Information"
|
|
ENABLE_INFO_HK = "Enable Core Info HK"
|
|
DISABLE_INFO_HK = "Disable Core Info HK"
|
|
RESET_ALL_GND_WDTS = "Reset all Ground Watchdogs"
|
|
REQUEST_CORE_HK_ONCE = "Requesting Core HK once"
|
|
REQUEST_AUX_HK_ONCE = "Requesting Aux HK once"
|
|
PRINT_SWITCH_V_I = "Print Switch V I Info"
|
|
PRINT_LATCHUPS = "Print latchups"
|
|
|
|
|
|
class PowerOpCodes:
|
|
# PDU 1
|
|
TCS_ON = ["tcs_on"]
|
|
TCS_OFF = ["tcs_off"]
|
|
SYRLINKS_ON = ["syrlinks_on"]
|
|
SYRLINKS_OFF = ["syrlinks_off"]
|
|
STAR_TRACKER_ON = ["str_on"]
|
|
STAR_TRACKER_OFF = ["str_off"]
|
|
MGT_ON = ["mgt_on"]
|
|
MGT_OFF = ["mgt_off"]
|
|
SUS_N_ON = ["sus_nom_on"]
|
|
SUS_N_OFF = ["sus_nom_off"]
|
|
SCEX_ON = ["scex_on"]
|
|
SCEX_OFF = ["scex_off"]
|
|
PLOC_ON = ["ploc_on"]
|
|
PLOC_OFF = ["ploc_off"]
|
|
ACS_A_ON = ["acs_a_on"]
|
|
ACS_A_OFF = ["acs_a_off"]
|
|
|
|
# PDU 2
|
|
PL_PCDU_VBAT_NOM_ON = ["plpcdu_vbat_nom_on"]
|
|
PL_PCDU_VBAT_NOM_OFF = ["plpcdu_vbat_nom_off"]
|
|
RW_ON = ["rw_on"]
|
|
RW_OFF = ["rw_off"]
|
|
HEATER_ON = ["heater_on"]
|
|
HEATER_OFF = ["heater_off"]
|
|
SUS_R_ON = ["sus_red_on"]
|
|
SUS_R_OFF = ["sus_red_off"]
|
|
SOLAR_ARRAY_DEPL_ON = ["sa_depl_on"]
|
|
SOLAR_ARRAY_DEPL_OFF = ["sa_depl_off"]
|
|
PL_PCDU_VBAT_RED_ON = ["plpcdu_vbat_red_on"]
|
|
PL_PCDU_VBAT_RED_OFF = ["plpcdu_vbat_red_off"]
|
|
ACS_B_ON = ["acs_b_on"]
|
|
ACS_B_OFF = ["acs_b_off"]
|
|
PL_CAM_ON = ["cam_on"]
|
|
PL_CAM_OFF = ["cam_off"]
|
|
|
|
REBOOT = ["reboot"]
|
|
INFO_CORE = ["info"]
|
|
ENABLE_INFO_HK = ["info_hk_on"]
|
|
DISABLE_INFO_HK = ["info_hk_off"]
|
|
INFO_AUX = ["info_aux"]
|
|
INFO_ALL = ["info_all"]
|
|
RESET_ALL_GND_WDTS = ["reset_gnd_wdts"]
|
|
# Request HK
|
|
REQUEST_CORE_HK_ONCE = ["hk_core"]
|
|
REQUEST_AUX_HK_ONCE = ["hk_aux"]
|
|
PRINT_SWITCH_V_I = ["print_switch_vi"]
|
|
PRINT_LATCHUPS = ["print_latchups"]
|
|
|
|
|
|
class SetIds:
|
|
CORE = 1
|
|
AUX = 2
|
|
CONFIG = 3
|
|
|
|
|
|
def pack_common_power_cmds(
|
|
prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
|
|
):
|
|
objb = object_id.as_bytes
|
|
if op_code in PowerOpCodes.ENABLE_INFO_HK:
|
|
interval = float(input("Specify HK interval in floating point seconds: "))
|
|
q.add_log_cmd(f"{prefix}: {PowerInfo.ENABLE_INFO_HK} with interval {interval}")
|
|
cmds = enable_periodic_hk_command_with_interval(
|
|
True, make_sid(objb, SetIds.CORE), interval
|
|
)
|
|
for cmd in cmds:
|
|
q.add_pus_tc(cmd)
|
|
if op_code in PowerOpCodes.DISABLE_INFO_HK:
|
|
q.add_log_cmd(f"{prefix}: {PowerInfo.DISABLE_INFO_HK}")
|
|
q.add_pus_tc(disable_periodic_hk_command(True, make_sid(objb, SetIds.CORE)))
|
|
|
|
|
|
def pack_common_gomspace_cmds(
|
|
prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
|
|
):
|
|
objb = object_id.as_bytes
|
|
if op_code in PowerOpCodes.PRINT_SWITCH_V_I:
|
|
q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_SWITCH_V_I}")
|
|
q.add_pus_tc(
|
|
make_fsfw_action_cmd(
|
|
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
|
|
)
|
|
)
|
|
if op_code in PowerOpCodes.REBOOT:
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.REBOOT}")
|
|
q.add_pus_tc(pack_reboot_command(object_id))
|
|
if op_code in PowerOpCodes.PRINT_LATCHUPS:
|
|
q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_LATCHUPS}")
|
|
q.add_pus_tc(
|
|
make_fsfw_action_cmd(
|
|
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
|
|
)
|
|
)
|
|
if op_code in GomspaceOpCodes.SET_INTEGER_PARAM:
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.SET_PARAMETER}")
|
|
print("Please specify the parameter type from index")
|
|
for idx, v in enumerate(ParamTypes):
|
|
print(f"{idx}: {v.name}")
|
|
ptype = int(input("Index: "))
|
|
prompt_and_pack_set_integer_param_command(q, object_id, ParamTypes(ptype))
|
|
if op_code in GomspaceOpCodes.GET_PARAM:
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.GET_PARAMETER}")
|
|
prompt_and_pack_get_param_command(q, object_id)
|
|
if op_code in GomspaceOpCodes.REQUEST_CONFIG_TABLE:
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.REQUEST_CONFIG_TABLE}")
|
|
q.add_pus_tc(pack_request_config_command(object_id.as_bytes))
|
|
if op_code in GomspaceOpCodes.SAVE_TABLE:
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.SAVE_TABLE}")
|
|
source_table = int(
|
|
input(
|
|
"Source table [0: Board Config, 1: Module Config, "
|
|
"2: Calibration Parameter, 4: TM Data]: "
|
|
)
|
|
)
|
|
if source_table not in [0, 1, 2, 4]:
|
|
raise ValueError("Invalid source table index")
|
|
# Not used for now
|
|
"""
|
|
target_table = int(input(
|
|
"Target table. [Default: Source table]: "
|
|
))
|
|
"""
|
|
q.add_pus_tc(
|
|
make_fsfw_action_cmd(
|
|
object_id=object_id.as_bytes,
|
|
action_id=GomspaceDeviceActionIds.SAVE_TABLE,
|
|
user_data=bytes([source_table]),
|
|
)
|
|
)
|
|
if op_code in GomspaceOpCodes.SAVE_TABLE_DEFAULT:
|
|
source_table = int(
|
|
input(
|
|
"Source table [0: Board Config, 1: Module Config, 2: Calibration Parameter]: "
|
|
)
|
|
)
|
|
if source_table not in [0, 1, 2]:
|
|
raise ValueError("Invalid source table index")
|
|
q.add_pus_tc(
|
|
make_fsfw_action_cmd(
|
|
object_id=object_id.as_bytes,
|
|
action_id=GomspaceDeviceActionIds.SAVE_TABLE_DEFAULT,
|
|
user_data=bytes([source_table]),
|
|
)
|
|
)
|
|
if op_code in GomspaceOpCodes.LOAD_TABLE:
|
|
target_table = int(
|
|
input(
|
|
"Target table ID [0: Board Config, 1: Module Config, 2: Calibration Parameter, "
|
|
"4: HK TM]: "
|
|
)
|
|
)
|
|
if target_table not in [0, 1, 2, 4]:
|
|
raise ValueError("Invalid source table index")
|
|
if target_table != 4:
|
|
source_table = int(
|
|
input(
|
|
"Source table (file or default) [0: Board Config, 1: Module Config, "
|
|
"2: Calibration Parameter, value + 4 for default table]: "
|
|
)
|
|
)
|
|
if source_table not in [0, 1, 2, 4, 5, 6]:
|
|
raise ValueError("Invalid source table index")
|
|
else:
|
|
# Will be ignored
|
|
source_table = 4
|
|
q.add_pus_tc(
|
|
make_fsfw_action_cmd(
|
|
object_id=object_id.as_bytes,
|
|
action_id=GomspaceDeviceActionIds.LOAD_TABLE,
|
|
user_data=bytes([source_table, target_table]),
|
|
)
|
|
)
|
|
if op_code in GomspaceOpCodes.RESET_GND_WATCHDOG:
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.RESET_GND_WATCHDOG}")
|
|
q.add_pus_tc(pack_gnd_wdt_reset_command(object_id))
|
|
|
|
|
|
def pack_reset_gnd_wdt_cmd(
|
|
q: DefaultPusQueueHelper, prefix: str, object_id: ObjectIdBase
|
|
):
|
|
q.add_log_cmd(f"{prefix}: {GsInfo.RESET_GND_WATCHDOG}")
|
|
q.add_pus_tc(pack_gnd_wdt_reset_command(object_id))
|
|
|
|
|
|
def req_hk_cmds(
|
|
prefix: str,
|
|
q: DefaultPusQueueHelper,
|
|
op_code: str,
|
|
obj_id: bytes,
|
|
set_id_pair: [int, int],
|
|
):
|
|
if op_code in PowerOpCodes.REQUEST_CORE_HK_ONCE:
|
|
q.add_log_cmd(f"{prefix}: {PowerInfo.REQUEST_CORE_HK_ONCE}")
|
|
hk_sid = make_sid(object_id=obj_id, set_id=set_id_pair[0])
|
|
q.add_pus_tc(generate_one_diag_command(sid=hk_sid))
|
|
if op_code in PowerOpCodes.REQUEST_AUX_HK_ONCE:
|
|
q.add_log_cmd(f"{prefix}: {PowerInfo.REQUEST_AUX_HK_ONCE}")
|
|
hk_sid = make_sid(object_id=obj_id, set_id=set_id_pair[1])
|
|
q.add_pus_tc(generate_one_hk_command(sid=hk_sid))
|
|
|
|
|
|
def pack_pdu_disable_safe_off_cmd() -> PusTelecommand:
|
|
pass
|
|
|
|
|
|
def generic_on_cmd(
|
|
object_id: bytes, q: DefaultPusQueueHelper, info_str: str, out_idx: int
|
|
):
|
|
q.add_log_cmd(info_str + " on")
|
|
q.add_pus_tc(create_generic_on_cmd(object_id, out_idx))
|
|
|
|
|
|
def create_generic_on_cmd(object_id: bytes, out_idx: int):
|
|
return pack_set_u8_param_command(
|
|
object_id,
|
|
OUT_ENABLE_LIST[out_idx].parameter_address,
|
|
Channel.on,
|
|
)
|
|
|
|
|
|
def generic_off_cmd(
|
|
object_id: bytes, q: DefaultPusQueueHelper, info_str: str, out_idx: int
|
|
):
|
|
q.add_log_cmd(info_str + " off")
|
|
q.add_pus_tc(create_generic_off_cmd(object_id, out_idx))
|
|
|
|
|
|
def create_generic_off_cmd(object_id: bytes, out_idx: int):
|
|
return pack_set_u8_param_command(
|
|
object_id,
|
|
OUT_ENABLE_LIST[out_idx].parameter_address,
|
|
Channel.off,
|
|
)
|
|
|
|
|
|
def add_common_power_defs(oce: OpCodeEntry):
|
|
oce.add(keys=PowerOpCodes.REQUEST_CORE_HK_ONCE, info=PowerInfo.REQUEST_CORE_HK_ONCE)
|
|
oce.add(keys=PowerOpCodes.REQUEST_AUX_HK_ONCE, info=PowerInfo.REQUEST_AUX_HK_ONCE)
|
|
oce.add(keys=PowerOpCodes.ENABLE_INFO_HK, info=PowerInfo.ENABLE_INFO_HK)
|
|
oce.add(keys=PowerOpCodes.DISABLE_INFO_HK, info=PowerInfo.DISABLE_INFO_HK)
|
|
|
|
|
|
def add_gomspace_cmd_defs(oce: OpCodeEntry):
|
|
oce.add(
|
|
keys=PowerOpCodes.REQUEST_CORE_HK_ONCE,
|
|
info=PowerInfo.REQUEST_CORE_HK_ONCE,
|
|
)
|
|
oce.add(
|
|
keys=PowerOpCodes.REQUEST_AUX_HK_ONCE,
|
|
info=PowerInfo.REQUEST_AUX_HK_ONCE,
|
|
)
|
|
oce.add(keys=PowerOpCodes.PRINT_LATCHUPS, info=PowerInfo.PRINT_LATCHUPS)
|
|
oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
|
|
oce.add(keys=GomspaceOpCodes.REBOOT, info=GsInfo.REBOOT)
|
|
oce.add(keys=GomspaceOpCodes.SET_INTEGER_PARAM, info=GsInfo.SET_PARAMETER)
|
|
oce.add(keys=GomspaceOpCodes.REQUEST_CONFIG_TABLE, info=GsInfo.REQUEST_CONFIG_TABLE)
|
|
oce.add(keys=GomspaceOpCodes.SAVE_TABLE, info=GsInfo.SAVE_TABLE)
|
|
oce.add(keys=GomspaceOpCodes.SAVE_TABLE_DEFAULT, info=GsInfo.SAVE_TABLE_DEFAULT)
|
|
oce.add(keys=GomspaceOpCodes.LOAD_TABLE, info=GsInfo.LOAD_TABLE)
|
|
oce.add(keys=GomspaceOpCodes.RESET_GND_WATCHDOG, info=GsInfo.RESET_GND_WATCHDOG)
|
|
|
|
|
|
OBC_ENDIANNESS = "<"
|
|
|
|
|
|
def unpack_array_in_data(
|
|
data: bytes, start_addr: int, width: int, entries: int, struct_spec: str
|
|
) -> List:
|
|
return [
|
|
struct.unpack(
|
|
f"{OBC_ENDIANNESS}{struct_spec}",
|
|
data[start_addr + (i * width) : start_addr + ((i + 1) * width)],
|
|
)[0]
|
|
for i in range(entries)
|
|
]
|