request diag packet

This commit is contained in:
Robin Müller 2022-04-05 19:49:42 +02:00
parent 5b2dfa55eb
commit 3f5d77a5e5
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
5 changed files with 40 additions and 50 deletions

View File

@ -7,7 +7,11 @@
""" """
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER from config.object_ids import P60_DOCK_HANDLER
@ -126,7 +130,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE: if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once"))
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK) hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK)
command = generate_one_hk_command(sid=hk_sid, ssc=0) command = generate_one_diag_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft( tc_queue.appendleft(

View File

@ -126,41 +126,28 @@ def pack_syrlinks_command(
command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER) command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
<<<<<<< HEAD
if op_code == "15": if op_code == "15":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK"))
(QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK") command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
)
command = object_id + struct.pack('!I', CommandIds.SET_WAVEFORM_OQPSK)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "16": if op_code == "16":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK"))
(QueueCommands.PRINT, "Syrlinks: Set waveform BPSK") command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
)
command = object_id + struct.pack('!I', CommandIds.SET_WAVEFORM_BPSK)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "17": if op_code == "17":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config"))
(QueueCommands.PRINT, "Syrlinks: Set second config") command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
)
command = object_id + struct.pack('!I', CommandIds.SET_SECOND_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "18": if op_code == "18":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout"))
(QueueCommands.PRINT, "Syrlinks: Enable debug printout") command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG)
)
command = object_id + struct.pack('!I', CommandIds.ENABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "19": if op_code == "19":
tc_queue.appendleft( tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout"))
(QueueCommands.PRINT, "Syrlinks: Disable debug printout") command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG)
)
command = object_id + struct.pack('!I', CommandIds.DISABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
=======
>>>>>>> develop

View File

@ -26,9 +26,7 @@ def get_event_dict() -> EventDictT:
def handle_event_packet( def handle_event_packet(
raw_tm: bytes, raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
printer: FsfwTmTcPrinter,
file_logger: logging.Logger
) -> str: ) -> str:
tm = Service5Tm.unpack(raw_telemetry=raw_tm) tm = Service5Tm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm, info_if=tm) printer.handle_long_tm_print(packet_if=tm, info_if=tm)

View File

@ -50,15 +50,11 @@ def pus_factory_hook(raw_tm_packet: bytes):
handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet) handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet)
elif service_type == 3: elif service_type == 3:
handle_hk_packet( handle_hk_packet(
printer=FSFW_PRINTER, printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict
raw_tm=raw_tm_packet,
obj_id_dict=obj_id_dict
) )
elif service_type == 5: elif service_type == 5:
handle_event_packet( handle_event_packet(
raw_tm=raw_tm_packet, raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger
printer=FSFW_PRINTER,
file_logger=file_logger
) )
elif service_type == 8: elif service_type == 8:
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet) tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet)

View File

@ -5,7 +5,11 @@ import datetime
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3Base, HkContentType, Service3FsfwTm from tmtccmd.tm.service_3_fsfw_housekeeping import (
Service3Base,
HkContentType,
Service3FsfwTm,
)
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from pus_tc.devs.bpx_batt import BpxSetIds from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.devs.syrlinks_hk_handler import SetIds from pus_tc.devs.syrlinks_hk_handler import SetIds
@ -29,9 +33,7 @@ def handle_hk_packet(
obj_id_dict: ObjectIdDictT, obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
): ):
tm_packet = Service3FsfwTm.unpack( tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
raw_telemetry=raw_tm, custom_hk_handling=False
)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None: if named_obj_id is None:
named_obj_id = tm_packet.object_id named_obj_id = tm_packet.object_id
@ -387,14 +389,13 @@ def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
fmt_str = "!fffH" fmt_str = "!fffH"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
( (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
temperature, fmt_str, hk_data[0 : 0 + inc_len]
ps_voltage, )
pl_voltage, printout = (
tx_agc_value f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
) = struct.unpack(fmt_str, hk_data[0:0 + inc_len]) f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
printout = f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " \ )
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
log_to_both(printer, printout) log_to_both(printer, printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
@ -486,8 +487,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
batt_current, batt_current,
batt_voltage, batt_voltage,
batt_temp_0, batt_temp_0,
batt_temp_1 batt_temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len current_idx += inc_len
device_types = [] device_types = []
device_statuses = [] device_statuses = []
@ -501,11 +502,15 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
current_idx += 1 current_idx += 1
wdt_reboots_list = [] wdt_reboots_list = []
for idx in range(5): for idx in range(5):
wdt_reboots_list.append(struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]) wdt_reboots_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4 current_idx += 4
time_pings_left_list = [] time_pings_left_list = []
for idx in range(3): for idx in range(3):
time_pings_left_list.append(struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]) time_pings_left_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4 current_idx += 4
for idx in range(2): for idx in range(2):
time_pings_left_list.append(hk_data[current_idx]) time_pings_left_list.append(hk_data[current_idx])