clean up and extend power TMTC handling

This commit is contained in:
Robin Müller 2022-08-26 23:52:47 +02:00
parent 0f9aed4ee2
commit 6c59fb95d3
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
10 changed files with 108 additions and 157 deletions

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit 77c940a5a71e57aabc341e77e5b5d415043e9c9b
Subproject commit 30129ee38f0d1627ae0cb1a327a39ab4e3e2795f

View File

@ -23,19 +23,20 @@ class GomspaceDeviceActionIds(enum.IntEnum):
PARAM_SET = 255
WDT_RESET = 9
REQUEST_HK_TABLE = 16
REQUEST_CONFIG_TABLE = 17,
REQUEST_CONFIG_TABLE = 17
PRINT_SWITCH_V_I = 32
PRINT_LATCHUPS = 33
class GomspaceOpCodes:
# Request HK
REQUEST_CORE_HK_ONCE = ["hk_core", "128"]
REQUEST_AUX_HK_ONCE = ["hk_aux", "129"]
PRINT_SWITCH_V_I = ["print-switch-vi", "130"]
PRINT_LATCHUPS = ["print-latchups", "131"]
GET_PARAM = ["get_param", "132"]
SET_PARAM = ["set_param", "133"]
REQUEST_CORE_HK_ONCE = ["hk_core"]
REQUEST_AUX_HK_ONCE = ["hk_aux"]
PRINT_SWITCH_V_I = ["print_switch_vi"]
PRINT_LATCHUPS = ["print_latchups"]
GET_PARAM = ["get_param"]
SET_PARAM = ["set_param"]
REQUEST_CONFIG_TABLE = ["cfg_table"]
class GsInfo:
@ -45,6 +46,7 @@ class GsInfo:
PRINT_LATCHUPS = "Print latchups"
GET_PARAMETER = "Get parameter"
SET_PARAMETER = "Set parameter"
REQUEST_CONFIG_TABLE = "Request Config Table"
class SetIds:
@ -78,12 +80,9 @@ class Channel:
off = 0
def pack_request_config_command(
object_id: bytes
) -> PusTelecommand:
def pack_request_config_command(object_id: bytes) -> PusTelecommand:
return make_fsfw_action_cmd(
object_id=object_id,
action_id=GomspaceDeviceActionIds.REQUEST_CONFIG_TABLE
object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_CONFIG_TABLE
)
@ -115,11 +114,7 @@ def pack_get_param_command(
def pack_set_param_command(
object_id: bytes,
memory_address: bytes,
parameter_size: int,
parameter: int,
ssc: int = 0,
object_id: bytes, memory_address: bytes, parameter_size: int, parameter: int
) -> PusTelecommand:
"""Function to generate a command to set a parameter
:param object_id: The object id of the gomspace device handler.
@ -127,7 +122,6 @@ def pack_set_param_command(
:param parameter: The parameter value to set.
:param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t
in the device tables.
:param ssc:
:return: The command as bytearray.
"""
action_id = GomspaceDeviceActionIds.PARAM_SET

View File

@ -1,11 +1,11 @@
import struct
from config.object_ids import *
from tmtc.power.common_power import handle_get_param_data_reply
from pus_tc.devs.imtq import ImtqActionIds
from pus_tm.defs import PrintWrapper
from tmtc.ploc_mpsoc import PlocReplyIds
from tmtc.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds
from tmtc.power.tm import handle_get_param_data_reply
from tmtccmd.logging import get_console_logger
from tmtccmd.tm import Service8FsfwTm
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter

View File

@ -5,6 +5,7 @@
"""
from config.definitions import CustomServiceList
from tmtc.power.common_power import add_gomspace_cmds
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
@ -81,28 +82,7 @@ def add_acu_cmds(defs: TmtcDefinitionWrapper):
def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Handling ACU command")
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd("ACU: Print channel stats")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=object_id.as_bytes,
action_id=gs.GomspaceDeviceActionIds.PRINT_SWITCH_V_I,
)
)
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
q.add_log_cmd(f"ACU: {GsInfo.REQUEST_CORE_HK_ONCE}")
hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_CORE)
q.add_pus_tc(generate_one_diag_command(sid=hk_sid))
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
q.add_log_cmd(f"ACU: {GsInfo.REQUEST_AUX_HK_ONCE}")
hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX)
q.add_pus_tc(generate_one_hk_command(sid=hk_sid))
if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"ACU: {GsInfo.GET_PARAMETER}")
gs.prompt_and_pack_get_param_command(q, object_id)
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"ACU: {GsInfo.SET_PARAMETER}")
gs.prompt_and_pack_set_param_command(q, object_id)
add_gomspace_cmds("ACU", object_id, q, op_code)
pack_test_cmds(object_id=object_id, q=q)

View File

@ -1,5 +1,4 @@
import enum
import struct
from config.object_ids import PDU_1_HANDLER_ID, PDU_2_HANDLER_ID
from gomspace.gomspace_common import (
@ -9,9 +8,11 @@ from gomspace.gomspace_common import (
GsInfo,
SetIds,
GomspaceDeviceActionIds,
prompt_and_pack_set_param_command,
prompt_and_pack_get_param_command,
pack_request_config_command,
)
from gomspace.gomspace_pdu_definitions import PDU_CONFIG_LIST
from pus_tm.defs import PrintWrapper
from tmtccmd.config import OpCodeEntry
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
@ -19,8 +20,8 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_diag_command,
generate_one_hk_command,
)
from tmtccmd.util import ObjectIdU32, ObjectIdBase
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from tmtccmd.util import ObjectIdU32
class Pdu1ChIndex(enum.IntEnum):
@ -187,6 +188,35 @@ def add_pdu2_common_defs(oce: OpCodeEntry):
oce.add(keys=PowerOpCodes.PL_CAM_OFF, info=info_off_pdu2(Pdu2InfoBase.PL_CAM))
def add_gomspace_cmds(
prefix: str, object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
objb = object_id.as_bytes
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd(f"{prefix}: {GsInfo.PRINT_SWITCH_V_I}")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
)
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
q.add_log_cmd(f"{prefix}: {GsInfo.PRINT_LATCHUPS}")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
)
)
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"{prefix}: {GsInfo.SET_PARAMETER}")
prompt_and_pack_set_param_command(q, object_id)
if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"{prefix}: {GsInfo.GET_PARAMETER}")
prompt_and_pack_get_param_command(q, object_id)
if op_code in GomspaceOpCodes.REQUEST_CONFIG_TABLE:
q.add_log_cmd(f"{prefix}: {GsInfo.REQUEST_CONFIG_TABLE}")
q.add_pus_tc(pack_request_config_command(object_id.as_bytes))
def pdu1_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in PowerOpCodes.TCS_ON:
tcs_on_cmd(q)
@ -447,28 +477,16 @@ def generic_off_cmd(
)
def handle_get_param_data_reply(
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray
):
if action_id == GomspaceDeviceActionIds.PARAM_GET:
pw.dlog(f"Parameter Get Request received for object {obj_id}")
header_list = [
"Gomspace Request Code",
"Table ID",
"Memory Address",
"Payload length",
"Payload",
]
fmt_str = "!BBHH"
(gs_request_code, table_id, address, payload_length) = struct.unpack(
fmt_str, custom_data[:6]
)
content_list = [
hex(gs_request_code),
table_id,
hex(address),
payload_length,
f"0x[{custom_data[6:].hex(sep=',')}]"
]
pw.dlog(f"{header_list}")
pw.dlog(f"{content_list}")
def add_gomspace_cmd_defs(oce: OpCodeEntry):
oce.add(
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info=GsInfo.PRINT_LATCHUPS)
oce.add(keys=GomspaceOpCodes.SET_PARAM, info=GsInfo.SET_PARAMETER)
oce.add(keys=GomspaceOpCodes.REQUEST_CONFIG_TABLE, info=GsInfo.REQUEST_CONFIG_TABLE)

View File

@ -5,8 +5,13 @@
@author J. Meier
@date 13.12.2020
"""
from tmtc.power.common_power import add_gomspace_cmds
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid, generate_one_diag_command
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import (
GsInfo,
GomspaceOpCodes,
@ -139,28 +144,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
Channel.off,
)
)
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
q.add_log_cmd("P60 Dock: Requesting HK Core HK Once")
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_CORE)
q.add_pus_tc(generate_one_diag_command(sid=hk_sid))
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
q.add_log_cmd("P60 Dock: Requesting HK Aux HK Once")
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_AUX)
q.add_pus_tc(generate_one_hk_command(sid=hk_sid))
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd("P60 Dock: Print Switches, Voltages, Currents")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
)
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
q.add_log_cmd("P60 Dock: Print Latchups")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
)
)
add_gomspace_cmds("P60 Dock", object_id, q, op_code)
if P60DockTestProcedure.all or P60DockTestProcedure.reboot:
q.add_log_cmd("P60 Dock: Reboot")
q.add_pus_tc(pack_reboot_command(object_id))

View File

@ -4,7 +4,7 @@
@date 17.12.2020
"""
import gomspace.gomspace_common as gs
from tmtc.power.common_power import pdu1_cmds, pdu1_req_hk_cmds
from tmtc.power.common_power import pdu1_cmds, pdu1_req_hk_cmds, add_gomspace_cmds
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
@ -32,26 +32,7 @@ def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
objb = object_id.as_bytes
pdu1_cmds(q, op_code)
pdu1_req_hk_cmds(q, op_code)
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd("PDU1: Print Switches, Voltages, Currents")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
)
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
q.add_log_cmd("PDU1: Print Latchups")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
)
)
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.SET_PARAMETER}")
prompt_and_pack_set_param_command(q, object_id)
if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.GET_PARAMETER}")
gs.prompt_and_pack_get_param_command(q, object_id)
add_gomspace_cmds("PDU1", object_id, q, op_code)
if PDU1TestProcedure.all or PDU1TestProcedure.ping:
q.add_log_cmd("PDU1: Ping Test")
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

View File

@ -6,7 +6,7 @@
@author J. Meier
@date 17.12.2020
"""
from tmtc.power.common_power import pdu2_cmds, pdu2_req_hk_cmds
from tmtc.power.common_power import pdu2_cmds, pdu2_req_hk_cmds, add_gomspace_cmds
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
@ -43,26 +43,7 @@ def pack_pdu2_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
objb = object_id.as_bytes
pdu2_cmds(q, op_code)
pdu2_req_hk_cmds(q, op_code)
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd(f"PDU2: {GsInfo.PRINT_SWITCH_V_I}")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I
)
)
if op_code in GomspaceOpCodes.PRINT_LATCHUPS:
q.add_log_cmd("PDU2: Print Latchups")
q.add_pus_tc(
make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
)
)
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"PDU2: {GsInfo.SET_PARAMETER}")
prompt_and_pack_set_param_command(q, object_id)
if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"PDU2: {GsInfo.GET_PARAMETER}")
prompt_and_pack_get_param_command(q, object_id)
add_gomspace_cmds("PDU2", object_id, q, op_code)
if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
q.add_log_cmd("PDU2: Reboot")
q.add_pus_tc(pack_reboot_command(object_id))

View File

@ -8,6 +8,7 @@ from tmtc.power.common_power import (
pdu2_cmds,
pdu1_req_hk_cmds,
pdu2_req_hk_cmds,
add_gomspace_cmd_defs,
)
from config.definitions import CustomServiceList
from tmtccmd import get_console_logger
@ -50,7 +51,7 @@ def add_p60_cmds(defs: TmtcDefinitionWrapper):
oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=P60Info.STACK_3V3_OFF)
oce.add(keys=P60OpCodes.STACK_5V_ON, info=P60Info.STACK_5V_ON)
oce.add(keys=P60OpCodes.STACK_5V_OFF, info=P60Info.STACK_5V_OFF)
add_gomspace_cmds(oce)
add_gomspace_cmd_defs(oce)
oce.add(keys=P60OpCodes.TEST, info="P60 Tests")
defs.add_service(
name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce
@ -76,7 +77,7 @@ def add_power_cmd_defs(defs: TmtcDefinitionWrapper):
def add_pdu1_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
add_pdu1_common_defs(oce)
add_gomspace_cmds(oce)
add_gomspace_cmd_defs(oce)
oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE)
oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE)
oce.add(
@ -96,7 +97,7 @@ def add_pdu1_cmds(defs: TmtcDefinitionWrapper):
def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
add_pdu2_common_defs(oce)
add_gomspace_cmds(oce)
add_gomspace_cmd_defs(oce)
oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU2: Print Switches, Voltages, Currents",
@ -112,20 +113,6 @@ def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
)
def add_gomspace_cmds(oce: OpCodeEntry):
oce.add(
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info=GsInfo.PRINT_LATCHUPS)
oce.add(keys=GomspaceOpCodes.SET_PARAM, info=GsInfo.SET_PARAMETER)
def add_pcdu_cmds(defs: TmtcDefinitionWrapper):
add_p60_cmds(defs)
add_pdu1_cmds(defs)

View File

@ -1,9 +1,10 @@
import struct
from typing import List, Tuple
from tmtccmd.util import ObjectIdBase
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from pus_tm.defs import PrintWrapper
from gomspace.gomspace_common import SetIds
from gomspace.gomspace_common import SetIds, GomspaceDeviceActionIds
P60_INDEX_LIST = [
"ACU VCC",
@ -378,9 +379,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
f"{i} | {str(voltages[i]).ljust(4)} | {str(currents[i]).ljust(4)} | "
f"{str(vboosts[i]).ljust(4)} | {str(powers[i]).ljust(2)}"
)
pw.dlog(
f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}"
)
pw.dlog(f"Temperatures in C: Ch0 {tmp0} | Ch1 {tmp1} | Ch2 {tmp2}")
pw.dlog(
f"Boot Count {bootcnt} | Uptime {uptime} sec | "
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
@ -423,3 +422,30 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
)
dev_parser.print(pw=pw)
printer.print_validity_buffer(validity_buffer=hk_data[current_idx:], num_vars=8)
def handle_get_param_data_reply(
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray
):
if action_id == GomspaceDeviceActionIds.PARAM_GET:
pw.dlog(f"Parameter Get Request received for object {obj_id}")
header_list = [
"Gomspace Request Code",
"Table ID",
"Memory Address",
"Payload length",
"Payload",
]
fmt_str = "!BBHH"
(gs_request_code, table_id, address, payload_length) = struct.unpack(
fmt_str, custom_data[:6]
)
content_list = [
hex(gs_request_code),
table_id,
hex(address),
payload_length,
f"0x[{custom_data[6:].hex(sep=',')}]",
]
pw.dlog(f"{header_list}")
pw.dlog(f"{content_list}")