commands to set and get pcdu parameters

This commit is contained in:
Jakob Meier 2022-06-10 11:02:08 +02:00
parent bfc9b17f10
commit 2a245b1714
7 changed files with 89 additions and 16 deletions

View File

@ -7,6 +7,7 @@
@date 17.12.2020 @date 17.12.2020
""" """
import enum import enum
import struct
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.definitions import PusTelecommand from tmtccmd.tc.definitions import PusTelecommand
@ -30,11 +31,15 @@ class GomspaceOpCodes:
REQUEST_AUX_HK_ONCE = ["hk-aux", "129"] REQUEST_AUX_HK_ONCE = ["hk-aux", "129"]
PRINT_SWITCH_V_I = ["print-switch-vi", "130"] PRINT_SWITCH_V_I = ["print-switch-vi", "130"]
PRINT_LATCHUPS = ["print-latchups", "131"] PRINT_LATCHUPS = ["print-latchups", "131"]
GET_PARAM = ["get-param", "132"]
SET_PARAM = ["set-param", "133"]
class Info: class Info:
REQUEST_CORE_HK_ONCE = "Requesting Core HK once" REQUEST_CORE_HK_ONCE = "Requesting Core HK once"
REQUEST_AUX_HK_ONCE = "Requesting Aux HK once" REQUEST_AUX_HK_ONCE = "Requesting Aux HK once"
GET_PARAMETER = "Get parameter"
SET_PARAMETER = "Set parameter"
class SetIds: class SetIds:
@ -78,10 +83,9 @@ def pack_get_param_command(
@param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2 @param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2
@return: The command as bytearray. @return: The command as bytearray.
""" """
app_data = bytearray() app_data = struct.pack('!B', table_id)
app_data.append(table_id) app_data += struct.pack('!H', memory_address)
app_data.extend(memory_address) app_data += struct.pack('!B', parameter_size)
app_data.append(parameter_size)
return generate_action_command( return generate_action_command(
object_id=object_id, object_id=object_id,
action_id=GomspaceDeviceActionIds.PARAM_GET, action_id=GomspaceDeviceActionIds.PARAM_GET,
@ -112,10 +116,7 @@ def pack_set_param_command(
if parameter_size == 1: if parameter_size == 1:
app_data.append(parameter) app_data.append(parameter)
elif parameter_size == 2: elif parameter_size == 2:
byte_one = 0xFF00 & parameter >> 8 app_data += struct.pack('!H', parameter)
byte_two = 0xFF & parameter
app_data.append(byte_one)
app_data.append(byte_two)
elif parameter_size == 4: elif parameter_size == 4:
byte_one = 0xFF000000 & parameter >> 24 byte_one = 0xFF000000 & parameter >> 24
byte_two = 0xFF0000 & parameter >> 16 byte_two = 0xFF0000 & parameter >> 16

View File

@ -3,6 +3,8 @@
@author J. Meier, R. Mueller @author J. Meier, R. Mueller
@date 21.12.2020 @date 21.12.2020
""" """
import struct
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import add_op_code_entry, add_service_op_code_entry from tmtccmd.config import add_op_code_entry, add_service_op_code_entry
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
@ -63,6 +65,16 @@ def add_acu_cmds(cmd_dict: ServiceOpCodeDictT):
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE,
) )
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.GET_PARAM,
info=GsInfo.GET_PARAMETER,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.SET_PARAM,
info=GsInfo.SET_PARAMETER,
)
add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.TEST, info=Info.TEST) add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.TEST, info=Info.TEST)
add_service_op_code_entry( add_service_op_code_entry(
srv_op_code_dict=cmd_dict, srv_op_code_dict=cmd_dict,
@ -97,6 +109,25 @@ def pack_acu_commands(
hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX) hk_sid = make_sid(object_id=object_id.as_bytes, set_id=gs.SetIds.ACU_AUX)
command = generate_one_hk_command(sid=hk_sid, ssc=0) command = generate_one_hk_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.GET_PARAM:
tc_queue.appendleft(
(QueueCommands.PRINT, f"PDU1: {GsInfo.GET_PARAMETER}")
)
table_id = int(input("Specify table ID: "))
memory_address = int(input("Specify memory address: 0x"), 16)
parameter_size = int(input("Specify parameter size: "))
command = gs.pack_get_param_command(object_id.as_bytes, table_id, memory_address, parameter_size)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.SET_PARAM:
tc_queue.appendleft(
(QueueCommands.PRINT, f"PDU1: {GsInfo.SET_PARAMETER}")
)
memory_address = int(input("Specify memory address: 0x"), 16)
memory_address = struct.pack('!H', memory_address)
parameter_size = int(input("Specify parameter size: "))
parameter = int(input("Specify parameter: "))
command = gs.pack_set_param_command(object_id.as_bytes, memory_address, parameter_size, parameter)
tc_queue.appendleft(command.pack_command_tuple())
pack_test_cmds(object_id=object_id, tc_queue=tc_queue) pack_test_cmds(object_id=object_id, tc_queue=tc_queue)
return tc_queue return tc_queue

View File

@ -170,6 +170,9 @@ def add_pdu1_cmds(cmd_dict: ServiceOpCodeDictT):
add_op_code_entry( add_op_code_entry(
op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests" op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests"
) )
add_op_code_entry(
op_code_dict=op_code_dict, keys=GomspaceOpCodes.SET_PARAM, info="Set parameter"
)
add_service_op_code_entry( add_service_op_code_entry(
srv_op_code_dict=cmd_dict, srv_op_code_dict=cmd_dict,
name=CustomServiceList.PDU1.value, name=CustomServiceList.PDU1.value,

View File

@ -1,8 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""PDU2 is mounted on the X2 slot of the P60 dock """PDU1 is mounted on the X2 slot of the P60 dock
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
import gomspace.gomspace_common as gs
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
@ -292,3 +293,13 @@ def pack_pdu1_commands(object_id: ObjectId, tc_queue: TcQueueT, op_code: str):
Channel.off, Channel.off,
) )
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.SET_PARAM:
tc_queue.appendleft(
(QueueCommands.PRINT, f"PDU1: {Info.SET_PARAMETER}")
)
memory_address = int(input("Specify memory address: 0x"), 16)
memory_address = struct.pack('!H', memory_address)
parameter_size = int(input("Specify parameter size: "))
parameter = int(input("Specify parameter: "))
command = gs.pack_set_param_command(object_id.as_bytes, memory_address, parameter_size, parameter)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -4,6 +4,7 @@ from pus_tc.devs.imtq import ImtqActionIds
from pus_tc.devs.ploc_mpsoc import PlocReplyIds from pus_tc.devs.ploc_mpsoc import PlocReplyIds
from pus_tc.devs.ploc_supervisor import SupvActionIds from pus_tc.devs.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds from pus_tc.devs.star_tracker import StarTrackerActionIds
from gomspace.gomspace_common import GomspaceDeviceActionIds
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked from tmtccmd.config.definitions import DataReplyUnpacked
from tmtccmd.tm import Service8FsfwTm from tmtccmd.tm import Service8FsfwTm
@ -36,6 +37,8 @@ def handle_action_reply(
return handle_supervisor_replies(action_id, printer, custom_data) return handle_supervisor_replies(action_id, printer, custom_data)
elif object_id.as_bytes == STAR_TRACKER_ID: elif object_id.as_bytes == STAR_TRACKER_ID:
return handle_startracker_replies(action_id, printer, custom_data) return handle_startracker_replies(action_id, printer, custom_data)
elif object_id.as_bytes == ACU_HANDLER_ID:
return handle_acu_replies(action_id, printer, custom_data)
def handle_imtq_replies( def handle_imtq_replies(
@ -122,3 +125,27 @@ def handle_startracker_replies(
print(content_list) print(content_list)
printer.file_logger.info(header_list) printer.file_logger.info(header_list)
printer.file_logger.info(content_list) printer.file_logger.info(content_list)
def handle_acu_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
if action_id == GomspaceDeviceActionIds.PARAM_GET:
header_list = [
"Gomspace action ID"
"Table ID",
"Memory Address",
"Payload length"
"Payload"
]
fmt_str = "!BBHH"
(action, table_id, address, payload_length) = struct.unpack(fmt_str, custom_data[:6])
content_list = [
action,
table_id,
"0x" + hex(address),
payload_length,
"0x" + custom_data[6:].hex(),
]
print(header_list)
print(content_list)

View File

@ -180,13 +180,13 @@ def handle_pdu_data(
current_list = [] current_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)): for idx in range(len(PDU1_CHANNELS_NAMES)):
current_list.append( current_list.append(
struct.unpack("!h", hk_data[current_idx : current_idx + 2])[0] struct.unpack("!h", hk_data[current_idx: current_idx + 2])[0]
) )
current_idx += 2 current_idx += 2
voltage_list = [] voltage_list = []
for idx in range(len(PDU1_CHANNELS_NAMES)): for idx in range(len(PDU1_CHANNELS_NAMES)):
voltage_list.append( voltage_list.append(
struct.unpack("!H", hk_data[current_idx : current_idx + 2])[0] struct.unpack("!H", hk_data[current_idx: current_idx + 2])[0]
) )
current_idx += 2 current_idx += 2
output_enb_list = [] output_enb_list = []
@ -205,7 +205,7 @@ def handle_pdu_data(
fmt_str = "!IBh" fmt_str = "!IBh"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
(boot_count, batt_mode, temperature) = struct.unpack( (boot_count, batt_mode, temperature) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len] fmt_str, hk_data[current_idx: current_idx + inc_len]
) )
info = ( info = (
f"Boot Count {boot_count} | Battery Mode {batt_mode} | " f"Boot Count {boot_count} | Battery Mode {batt_mode} | "

View File

@ -30,13 +30,13 @@ def handle_thermal_controller_hk_data(
"SENSOR_PLPCDU_HEATSPREADER": tm_data[13], "SENSOR_TCS_BOARD": tm_data[14], "SENSOR_PLPCDU_HEATSPREADER": tm_data[13], "SENSOR_TCS_BOARD": tm_data[14],
"SENSOR_MAGNETTORQUER": tm_data[15], "TMP1075 1": tm_data[16], "TMP1075 2": tm_data[17]} "SENSOR_MAGNETTORQUER": tm_data[15], "TMP1075 1": tm_data[16], "TMP1075 2": tm_data[17]}
print(parsed_data) # print(parsed_data)
tcp_server_sensor_temperatures.report_parsed_hk_data(object_id, set_id, parsed_data) tcp_server_sensor_temperatures.report_parsed_hk_data(object_id, set_id, parsed_data)
elif set_id == SetIds.DEVICE_TEMPERATURE_SET: elif set_id == SetIds.DEVICE_TEMPERATURE_SET:
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
pw.dlog("Received device temperature data") pw.dlog("Received device temperature data")
fmt_str = "!fhhhhiiiifHHHffffffffffffff" fmt_str = "!fhhhhiiiifffhffffffffffffff"
tm_data = struct.unpack(fmt_str, hk_data[:94]) tm_data = struct.unpack(fmt_str, hk_data[:98])
parsed_data = { parsed_data = {
"Q7S_TEMPERATURE": tm_data[0], "Q7S_TEMPERATURE": tm_data[0],
"BATTERY_TEMPERATURE_1": tm_data[1], "BATTERY_TEMPERATURE_1": tm_data[1],
@ -66,7 +66,7 @@ def handle_thermal_controller_hk_data(
"MGM_1_TEMPERATURE": tm_data[25], "MGM_1_TEMPERATURE": tm_data[25],
"ADC_PL_PCDU_TEMPERATURE": tm_data[26], "ADC_PL_PCDU_TEMPERATURE": tm_data[26],
} }
print(parsed_data) # print(parsed_data)
tcp_server_device_temperatures.report_parsed_hk_data(object_id, set_id, parsed_data) tcp_server_device_temperatures.report_parsed_hk_data(object_id, set_id, parsed_data)
elif set_id == SetIds.SUS_TEMPERATURE_SET: elif set_id == SetIds.SUS_TEMPERATURE_SET:
pass pass