Merge pull request 'Power Commands Update' (#108) from mueller/power-cmds-update into main

Reviewed-on: #108
Reviewed-by: Jakob Meier <meierj@irs.uni-stuttgart.de>
This commit is contained in:
Jakob Meier 2022-08-25 14:48:41 +02:00
commit 5c0def704f
10 changed files with 116 additions and 93 deletions

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="PLOC SUPV" type="PythonConfigurationType" factoryName="Python" folderName="Devices"> <configuration default="false" name="PLOC SUPV" type="PythonConfigurationType" factoryName="Python" folderName="PLOC">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />

View File

@ -15,6 +15,9 @@ list yields a list of all related PRs for each release.
- Major Update for `tmtccmd` and `spacepackets` dependencies which improves user API significantly. - Major Update for `tmtccmd` and `spacepackets` dependencies which improves user API significantly.
PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/102 PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/102
- Add commands to request MGM HK or enable/disable periodic HK for it - Add commands to request MGM HK or enable/disable periodic HK for it
- Update power commands: Extend param get and set to all
gomspace devices.
PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/108
- Update PLOC supervisor commands - Update PLOC supervisor commands
PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/107 PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/107

View File

@ -11,6 +11,7 @@ import struct
from typing import Union from typing import Union
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -28,18 +29,19 @@ class GomspaceDeviceActionIds(enum.IntEnum):
class GomspaceOpCodes: class GomspaceOpCodes:
# Request HK # Request HK
REQUEST_CORE_HK_ONCE = ["hk-core", "128"] REQUEST_CORE_HK_ONCE = ["hk_core", "128"]
REQUEST_AUX_HK_ONCE = ["hk-aux", "129"] REQUEST_AUX_HK_ONCE = ["hk_aux", "129"]
PRINT_SWITCH_V_I = ["print-switch-vi", "130"] PRINT_SWITCH_V_I = ["print-switch-vi", "130"]
PRINT_LATCHUPS = ["print-latchups", "131"] PRINT_LATCHUPS = ["print-latchups", "131"]
GET_PARAM = ["get-param", "132"] GET_PARAM = ["get_param", "132"]
SET_PARAM = ["set-param", "133"] SET_PARAM = ["set_param", "133"]
class GsInfo: class GsInfo:
REQUEST_CORE_HK_ONCE = "Requesting Core HK once" REQUEST_CORE_HK_ONCE = "Requesting Core HK once"
REQUEST_AUX_HK_ONCE = "Requesting Aux HK once" REQUEST_AUX_HK_ONCE = "Requesting Aux HK once"
PRINT_SWITCH_V_I = "Print Switch V I Info" PRINT_SWITCH_V_I = "Print Switch V I Info"
PRINT_LATCHUPS = "Print latchups"
GET_PARAMETER = "Get parameter" GET_PARAMETER = "Get parameter"
SET_PARAMETER = "Set parameter" SET_PARAMETER = "Set parameter"
@ -136,7 +138,30 @@ def pack_set_param_command(
app_data.append(byte_three) app_data.append(byte_three)
app_data.append(byte_four) app_data.append(byte_four)
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id, action_id=action_id, user_data=app_data, ssc=ssc object_id=object_id, action_id=action_id, user_data=app_data
)
def prompt_and_pack_get_param_command(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
table_id = int(input("Specify table ID: "))
memory_address = int(input("Specify memory address: 0x"), 16)
parameter_size = int(input("Specify parameter size: "))
q.add_pus_tc(
pack_get_param_command(
object_id.as_bytes, table_id, memory_address, parameter_size
)
)
def prompt_and_pack_set_param_command(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
memory_address = int(input("Specify memory address: 0x"), 16)
memory_address = struct.pack("!H", memory_address)
parameter_size = int(input("Specify parameter size: "))
parameter = int(input("Specify parameter: "))
q.add_pus_tc(
pack_set_param_command(
object_id.as_bytes, memory_address, parameter_size, parameter
)
) )

View File

@ -91,34 +91,19 @@ def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
) )
) )
if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE: if op_code in GomspaceOpCodes.REQUEST_CORE_HK_ONCE:
q.add_log_cmd(f"PDU1: {GsInfo.REQUEST_CORE_HK_ONCE}") q.add_log_cmd(f"ACU: {GsInfo.REQUEST_CORE_HK_ONCE}")
hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_CORE) hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_CORE)
q.add_pus_tc(generate_one_diag_command(sid=hk_sid)) q.add_pus_tc(generate_one_diag_command(sid=hk_sid))
if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE: if op_code in GomspaceOpCodes.REQUEST_AUX_HK_ONCE:
q.add_log_cmd(f"PDU1: {GsInfo.REQUEST_AUX_HK_ONCE}") q.add_log_cmd(f"ACU: {GsInfo.REQUEST_AUX_HK_ONCE}")
hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX) hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX)
q.add_pus_tc(generate_one_hk_command(sid=hk_sid)) q.add_pus_tc(generate_one_hk_command(sid=hk_sid))
if op_code in GomspaceOpCodes.GET_PARAM: if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.GET_PARAMETER}") q.add_log_cmd(f"ACU: {GsInfo.GET_PARAMETER}")
table_id = int(input("Specify table ID: ")) gs.prompt_and_pack_get_param_command(q, object_id)
memory_address = int(input("Specify memory address: 0x"), 16)
parameter_size = int(input("Specify parameter size: "))
q.add_pus_tc(
gs.pack_get_param_command(
object_id.as_bytes, table_id, memory_address, parameter_size
)
)
if op_code in GomspaceOpCodes.SET_PARAM: if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.SET_PARAMETER}") q.add_log_cmd(f"ACU: {GsInfo.SET_PARAMETER}")
memory_address = int(input("Specify memory address: 0x"), 16) gs.prompt_and_pack_set_param_command(q, object_id)
memory_address = struct.pack("!H", memory_address)
parameter_size = int(input("Specify parameter size: "))
parameter = int(input("Specify parameter: "))
q.add_pus_tc(
gs.pack_set_param_command(
object_id.as_bytes, memory_address, parameter_size, parameter
)
)
pack_test_cmds(object_id=object_id, q=q) pack_test_cmds(object_id=object_id, q=q)

View File

@ -1,4 +1,5 @@
import enum import enum
import struct
from config.object_ids import PDU_1_HANDLER_ID, PDU_2_HANDLER_ID from config.object_ids import PDU_1_HANDLER_ID, PDU_2_HANDLER_ID
from gomspace.gomspace_common import ( from gomspace.gomspace_common import (
@ -7,8 +8,10 @@ from gomspace.gomspace_common import (
GomspaceOpCodes, GomspaceOpCodes,
GsInfo, GsInfo,
SetIds, SetIds,
GomspaceDeviceActionIds,
) )
from gomspace.gomspace_pdu_definitions import PDU_CONFIG_LIST from gomspace.gomspace_pdu_definitions import PDU_CONFIG_LIST
from pus_tm.defs import PrintWrapper
from tmtccmd.config import OpCodeEntry from tmtccmd.config import OpCodeEntry
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
@ -16,6 +19,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_diag_command, generate_one_diag_command,
generate_one_hk_command, generate_one_hk_command,
) )
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
class Pdu1ChIndex(enum.IntEnum): class Pdu1ChIndex(enum.IntEnum):
@ -440,3 +444,29 @@ def generic_off_cmd(
Channel.off, Channel.off,
) )
) )
def handle_get_param_data_reply(
action_id: int, pw: PrintWrapper, custom_data: bytearray
):
if action_id == GomspaceDeviceActionIds.PARAM_GET:
header_list = [
"Gomspace action ID",
"Table ID",
"Memory Address",
"Payload length",
"Payload",
]
fmt_str = "!BBHH"
(action, table_id, address, payload_length) = struct.unpack(
fmt_str, custom_data[:6]
)
content_list = [
action,
table_id,
hex(address),
payload_length,
"0x" + custom_data[6:].hex(),
]
pw.dlog(f"{header_list}")
pw.dlog(f"{content_list}")

View File

@ -53,6 +53,12 @@ def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
) )
) )
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.SET_PARAMETER}")
prompt_and_pack_set_param_command(q, object_id)
if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.GET_PARAMETER}")
gs.prompt_and_pack_get_param_command(q, object_id)
if PDU1TestProcedure.all or PDU1TestProcedure.ping: if PDU1TestProcedure.all or PDU1TestProcedure.ping:
q.add_log_cmd("PDU1: Ping Test") q.add_log_cmd("PDU1: Ping Test")
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
@ -107,14 +113,3 @@ def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
Channel.off, Channel.off,
) )
) )
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"PDU1: {GsInfo.SET_PARAMETER}")
memory_address = int(input("Specify memory address: 0x"), 16)
memory_address = struct.pack("!H", memory_address)
parameter_size = int(input("Specify parameter size: "))
parameter = int(input("Specify parameter: "))
q.add_pus_tc(
gs.pack_set_param_command(
object_id.as_bytes, memory_address, parameter_size, parameter
)
)

View File

@ -6,16 +6,9 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
generate_one_diag_command,
make_sid,
)
from pus_tc.devs.common_power import pdu2_cmds, pdu2_req_hk_cmds from pus_tc.devs.common_power import pdu2_cmds, pdu2_req_hk_cmds
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import * from gomspace.gomspace_pdu_definitions import *
from config.object_ids import PDU_2_HANDLER_ID
class PDU2TestProcedure: class PDU2TestProcedure:
@ -64,6 +57,12 @@ def pack_pdu2_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS
) )
) )
if op_code in GomspaceOpCodes.SET_PARAM:
q.add_log_cmd(f"PDU2: {GsInfo.SET_PARAMETER}")
prompt_and_pack_set_param_command(q, object_id)
if op_code in GomspaceOpCodes.GET_PARAM:
q.add_log_cmd(f"PDU2: {GsInfo.GET_PARAMETER}")
prompt_and_pack_get_param_command(q, object_id)
if PDU2TestProcedure.all or PDU2TestProcedure.reboot: if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
q.add_log_cmd("PDU2: Reboot") q.add_log_cmd("PDU2: Reboot")
q.add_pus_tc(pack_reboot_command(object_id)) q.add_pus_tc(pack_reboot_command(object_id))

View File

@ -50,13 +50,7 @@ def add_p60_cmds(defs: TmtcDefinitionWrapper):
oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=P60Info.STACK_3V3_OFF) oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=P60Info.STACK_3V3_OFF)
oce.add(keys=P60OpCodes.STACK_5V_ON, info=P60Info.STACK_5V_ON) oce.add(keys=P60OpCodes.STACK_5V_ON, info=P60Info.STACK_5V_ON)
oce.add(keys=P60OpCodes.STACK_5V_OFF, info=P60Info.STACK_5V_OFF) oce.add(keys=P60OpCodes.STACK_5V_OFF, info=P60Info.STACK_5V_OFF)
oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE) add_gomspace_cmds(oce)
oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE)
oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="P60 Dock: Print Switches, Voltages, Currents",
)
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info="P60 Dock: Print Latchups")
oce.add(keys=P60OpCodes.TEST, info="P60 Tests") oce.add(keys=P60OpCodes.TEST, info="P60 Tests")
defs.add_service( defs.add_service(
name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce
@ -82,14 +76,15 @@ def add_power_cmd_defs(defs: TmtcDefinitionWrapper):
def add_pdu1_cmds(defs: TmtcDefinitionWrapper): def add_pdu1_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
add_pdu1_common_defs(oce) add_pdu1_common_defs(oce)
add_gomspace_cmds(oce)
oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE) oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE)
oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE) oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE)
oce.add( oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I, keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU1: Print Switches, Voltages, Currents", info="PDU1: Print Switches, Voltages, Currents",
) )
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info="PDU1: Print Latchups") oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
oce.add(keys=GomspaceOpCodes.SET_PARAM, info="Set parameter")
defs.add_service( defs.add_service(
name=CustomServiceList.PDU1.value, name=CustomServiceList.PDU1.value,
info="PDU1 Device", info="PDU1 Device",
@ -101,14 +96,7 @@ def add_pdu1_cmds(defs: TmtcDefinitionWrapper):
def add_pdu2_cmds(defs: TmtcDefinitionWrapper): def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
add_pdu2_common_defs(oce) add_pdu2_common_defs(oce)
oce.add( add_gomspace_cmds(oce)
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
oce.add( oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I, keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU2: Print Switches, Voltages, Currents", info="PDU2: Print Switches, Voltages, Currents",
@ -124,6 +112,20 @@ def add_pdu2_cmds(defs: TmtcDefinitionWrapper):
) )
def add_gomspace_cmds(oce: OpCodeEntry):
oce.add(
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
oce.add(keys=GomspaceOpCodes.GET_PARAM, info=GsInfo.GET_PARAMETER)
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info=GsInfo.PRINT_LATCHUPS)
oce.add(keys=GomspaceOpCodes.SET_PARAM, info=GsInfo.SET_PARAMETER)
def add_pcdu_cmds(defs: TmtcDefinitionWrapper): def add_pcdu_cmds(defs: TmtcDefinitionWrapper):
add_p60_cmds(defs) add_p60_cmds(defs)
add_pdu1_cmds(defs) add_pdu1_cmds(defs)

View File

@ -250,7 +250,6 @@ def perform_reboot_cmd(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.XSC_REBOOT, action_id=ActionIds.XSC_REBOOT,
user_data=tc_data, user_data=tc_data
ssc=0,
) )
) )

View File

@ -1,10 +1,11 @@
import struct import struct
from config.object_ids import * from config.object_ids import *
from pus_tc.devs.common_power import handle_get_param_data_reply
from pus_tc.devs.imtq import ImtqActionIds from pus_tc.devs.imtq import ImtqActionIds
from pus_tm.defs import PrintWrapper
from tmtc.ploc_mpsoc import PlocReplyIds from tmtc.ploc_mpsoc import PlocReplyIds
from tmtc.ploc_supervisor import SupvActionIds from tmtc.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds from pus_tc.devs.star_tracker import StarTrackerActionIds
from gomspace.gomspace_common import GomspaceDeviceActionIds
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tm import Service8FsfwTm from tmtccmd.tm import Service8FsfwTm
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
@ -21,13 +22,13 @@ def handle_action_reply(
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm) tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet) printer.handle_long_tm_print(packet_if=tm_packet, info_if=tm_packet)
object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes) object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes)
pw = PrintWrapper(printer)
custom_data = tm_packet.custom_data custom_data = tm_packet.custom_data
action_id = tm_packet.action_id action_id = tm_packet.action_id
generic_print_str = printer.generic_action_packet_tm_print( generic_print_str = printer.generic_action_packet_tm_print(
packet=tm_packet, obj_id=object_id packet=tm_packet, obj_id=object_id
) )
print(generic_print_str) pw.dlog(generic_print_str)
printer.file_logger.info(generic_print_str)
if object_id.as_bytes == IMTQ_HANDLER_ID: if object_id.as_bytes == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, printer, custom_data) return handle_imtq_replies(action_id, printer, custom_data)
elif object_id.as_bytes == PLOC_MPSOC_ID: elif object_id.as_bytes == PLOC_MPSOC_ID:
@ -36,8 +37,16 @@ def handle_action_reply(
return handle_supervisor_replies(action_id, printer, custom_data) return handle_supervisor_replies(action_id, printer, custom_data)
elif object_id.as_bytes == STAR_TRACKER_ID: elif object_id.as_bytes == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, printer, custom_data) return handle_startracker_replies(action_id, printer, custom_data)
elif object_id.as_bytes == ACU_HANDLER_ID: elif object_id.as_bytes in [
return handle_acu_replies(action_id, printer, custom_data) ACU_HANDLER_ID,
PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
P60_DOCK_HANDLER,
]:
return handle_get_param_data_reply(action_id, pw, custom_data)
else:
pw.dlog(f"No dedicated action reply handler found for reply from {object_id}")
pw.dlog(f"Raw Data: {tm_packet.custom_data.hex(sep=',')}")
def handle_imtq_replies( def handle_imtq_replies(
@ -123,27 +132,3 @@ def handle_startracker_replies(
print(content_list) print(content_list)
printer.file_logger.info(header_list) printer.file_logger.info(header_list)
printer.file_logger.info(content_list) printer.file_logger.info(content_list)
def handle_acu_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == GomspaceDeviceActionIds.PARAM_GET:
header_list = [
"Gomspace action ID" "Table ID",
"Memory Address",
"Payload length" "Payload",
]
fmt_str = "!BBHH"
(action, table_id, address, payload_length) = struct.unpack(
fmt_str, custom_data[:6]
)
content_list = [
action,
table_id,
"0x" + hex(address),
payload_length,
"0x" + custom_data[6:].hex(),
]
print(header_list)
print(content_list)