Merge remote-tracking branch 'origin/mueller/major-tmtc-update' into mueller/master

This commit is contained in:
Robin Mueller 2022-04-06 12:12:14 +02:00
commit 8917f303ec
10 changed files with 112 additions and 81 deletions

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtccli.py" />
<option name="PARAMETERS" value="-s pdu1 -l -t 6" /> <option name="PARAMETERS" value="-s pdu1 -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />

View File

@ -13,8 +13,7 @@ from config.definitions import CustomServiceList, PUS_APID
from config.custom_mode_op import CustomModeList from config.custom_mode_op import CustomModeList
from tmtccmd.config.definitions import CoreComInterfaces from tmtccmd.config.definitions import CoreComInterfaces
from tmtccmd.config.globals import ( from tmtccmd.config.globals import (
set_default_globals_pre_args_parsing, set_default_globals_pre_args_parsing
set_default_globals_post_args_parsing,
) )
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
@ -34,12 +33,3 @@ def set_globals_pre_args_parsing(gui: bool = False):
tm_apid=PUS_APID, tm_apid=PUS_APID,
com_if_id=CoreComInterfaces.TCPIP_UDP.value, com_if_id=CoreComInterfaces.TCPIP_UDP.value,
) )
def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str):
set_default_globals_post_args_parsing(
args=args,
custom_services_list=[CustomServiceList],
custom_modes_list=[CustomModeList],
json_cfg_path=json_cfg_path,
)

View File

@ -21,34 +21,15 @@ from config.retvals import get_retval_dict
class EiveHookObject(TmTcHookBase): class EiveHookObject(TmTcHookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict() service_op_code_dict = get_default_service_op_code_dict()
get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict) get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict)
return service_op_code_dict return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""The user can specify a path and filename for the JSON configuration file by overriding
this function.
:return:
"""
return "config/tmtc_config.json"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(
args=args, json_cfg_path=self.get_json_config_file_path()
)
def assign_communication_interface( def assign_communication_interface(
self, com_if_key: str self, com_if_key: str
) -> Union[CommunicationInterface, None]: ) -> Union[CommunicationInterface, None]:
@ -56,7 +37,7 @@ class EiveHookObject(TmTcHookBase):
return create_communication_interface_default( return create_communication_interface_default(
com_if_key=com_if_key, com_if_key=com_if_key,
json_cfg_path=self.get_json_config_file_path(), json_cfg_path=self.json_cfg_path,
space_packet_ids=(0x0865,), space_packet_ids=(0x0865,),
) )
@ -328,6 +309,26 @@ def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
"Syrlinks Handler: Read LCL config register", "Syrlinks Handler: Read LCL config register",
{OpCodeDictKeys.TIMEOUT: 2.0}, {OpCodeDictKeys.TIMEOUT: 2.0},
), ),
"15": (
"Syrlinks Handler: Set waveform OQPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"16": (
"Syrlinks Handler: Set waveform BPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"17": (
"Syrlinks Handler: Set second config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"18": (
"Syrlinks Handler: Enable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"19": (
"Syrlinks Handler: Disable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
} }
service_syrlinks_handler_tuple = ( service_syrlinks_handler_tuple = (
"Syrlinks Handler", "Syrlinks Handler",

View File

@ -1,6 +1,6 @@
SW_NAME = "eive" SW_NAME = "eive"
VERSION_MAJOR = 1 VERSION_MAJOR = 1
VERSION_MINOR = 8 VERSION_MINOR = 9
VERSION_SUBMINOR = 0 VERSION_SUBMINOR = 0
__version__ = "1.8.0" __version__ = "1.9.0"

View File

@ -7,7 +7,11 @@
""" """
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid from tmtccmd.tc.service_3_housekeeping import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER from config.object_ids import P60_DOCK_HANDLER
@ -126,7 +130,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
if op_code in GomspaceOpCodes.REQUEST_HK_ONCE: if op_code in GomspaceOpCodes.REQUEST_HK_ONCE:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Requesting HK Table Once"))
hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK) hk_sid = make_sid(object_id=P60_DOCK_HANDLER, set_id=SetIds.P60_DOCK)
command = generate_one_hk_command(sid=hk_sid, ssc=0) command = generate_one_diag_command(sid=hk_sid, ssc=0)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
tc_queue.appendleft( tc_queue.appendleft(

View File

@ -20,16 +20,21 @@ class SetIds:
class CommandIds: class CommandIds:
SET_TX_MODE_STANDBY = bytearray([0x0, 0x0, 0x0, 0x3]) READ_RX_STATUS_REGISTERS = 2
SET_TX_MODE_MODULATION = bytearray([0x0, 0x0, 0x0, 0x4]) SET_TX_MODE_STANDBY = 3
SET_TX_MODE_CW = bytearray([0x0, 0x0, 0x0, 0x5]) SET_TX_MODE_MODULATION = 4
READ_TX_STATUS = bytearray([0x0, 0x0, 0x0, 0x7]) SET_TX_MODE_CW = 5
READ_TX_WAVEFORM = bytearray([0x0, 0x0, 0x0, 0x8]) READ_TX_STATUS = 7
READ_TX_AGC_VALUE_HIGH_BYTE = bytearray([0x0, 0x0, 0x0, 0x9]) READ_TX_WAVEFORM = 8
READ_TX_AGC_VALUE_LOW_BYTE = bytearray([0x0, 0x0, 0x0, 0x9]) READ_TX_AGC_VALUE_HIGH_BYTE = 9
READ_TX_AGC_VALUE_LOW_BYTE = 10
WRITE_LCL_CONFIG = 11 WRITE_LCL_CONFIG = 11
READ_LCL_CONFIG_REGISTER = 12 READ_LCL_CONFIG_REGISTER = 12
READ_RX_STATUS_REGISTERS = 2 SET_WAVEFORM_OQPSK = 17
SET_WAVEFORM_BPSK = 18
SET_SECOND_CONFIG = 19
ENABLE_DEBUG = 20
DISABLE_DEBUG = 21
def pack_syrlinks_command( def pack_syrlinks_command(
@ -121,3 +126,28 @@ def pack_syrlinks_command(
command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER) command = object_id + struct.pack("!I", CommandIds.READ_LCL_CONFIG_REGISTER)
command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command) command = PusTelecommand(service=8, subservice=128, ssc=19, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "15":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform OQPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_OQPSK)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "16":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set waveform BPSK"))
command = object_id + struct.pack("!I", CommandIds.SET_WAVEFORM_BPSK)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "17":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Set second config"))
command = object_id + struct.pack("!I", CommandIds.SET_SECOND_CONFIG)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "18":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Enable debug printout"))
command = object_id + struct.pack("!I", CommandIds.ENABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "19":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Disable debug printout"))
command = object_id + struct.pack("!I", CommandIds.DISABLE_DEBUG)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -26,9 +26,7 @@ def get_event_dict() -> EventDictT:
def handle_event_packet( def handle_event_packet(
raw_tm: bytes, raw_tm: bytes, printer: FsfwTmTcPrinter, file_logger: logging.Logger
printer: FsfwTmTcPrinter,
file_logger: logging.Logger
) -> str: ) -> str:
tm = Service5Tm.unpack(raw_telemetry=raw_tm) tm = Service5Tm.unpack(raw_telemetry=raw_tm)
printer.handle_long_tm_print(packet_if=tm, info_if=tm) printer.handle_long_tm_print(packet_if=tm, info_if=tm)

View File

@ -50,15 +50,11 @@ def pus_factory_hook(raw_tm_packet: bytes):
handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet) handle_service_1_packet(printer=FSFW_PRINTER, raw_tm=raw_tm_packet)
elif service_type == 3: elif service_type == 3:
handle_hk_packet( handle_hk_packet(
printer=FSFW_PRINTER, printer=FSFW_PRINTER, raw_tm=raw_tm_packet, obj_id_dict=obj_id_dict
raw_tm=raw_tm_packet,
obj_id_dict=obj_id_dict
) )
elif service_type == 5: elif service_type == 5:
handle_event_packet( handle_event_packet(
raw_tm=raw_tm_packet, raw_tm=raw_tm_packet, printer=FSFW_PRINTER, file_logger=file_logger
printer=FSFW_PRINTER,
file_logger=file_logger
) )
elif service_type == 8: elif service_type == 8:
tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet) tm_packet = Service8FsfwTm.unpack(raw_telemetry=raw_tm_packet)

View File

@ -5,7 +5,11 @@ import datetime
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.config.definitions import HkReplyUnpacked from tmtccmd.config.definitions import HkReplyUnpacked
from tmtccmd.tm.service_3_fsfw_housekeeping import Service3Base, HkContentType, Service3FsfwTm from tmtccmd.tm.service_3_fsfw_housekeeping import (
Service3Base,
HkContentType,
Service3FsfwTm,
)
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from pus_tc.devs.bpx_batt import BpxSetIds from pus_tc.devs.bpx_batt import BpxSetIds
from pus_tc.devs.syrlinks_hk_handler import SetIds from pus_tc.devs.syrlinks_hk_handler import SetIds
@ -29,9 +33,7 @@ def handle_hk_packet(
obj_id_dict: ObjectIdDictT, obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
): ):
tm_packet = Service3FsfwTm.unpack( tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
raw_telemetry=raw_tm, custom_hk_handling=False
)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes) named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
if named_obj_id is None: if named_obj_id is None:
named_obj_id = tm_packet.object_id named_obj_id = tm_packet.object_id
@ -387,14 +389,13 @@ def handle_core_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
fmt_str = "!fffH" fmt_str = "!fffH"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)
( (temperature, ps_voltage, pl_voltage, tx_agc_value) = struct.unpack(
temperature, fmt_str, hk_data[0 : 0 + inc_len]
ps_voltage, )
pl_voltage, printout = (
tx_agc_value f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | "
) = struct.unpack(fmt_str, hk_data[0:0 + inc_len])
printout = f"Chip Temperature [°C] {temperature} | PS Voltage [mV] {ps_voltage} | " \
f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}" f"PL Voltage [mV] {pl_voltage} | TX AGC {tx_agc_value}"
)
log_to_both(printer, printout) log_to_both(printer, printout)
printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4) printer.print_validity_buffer(validity_buffer=hk_data[inc_len:], num_vars=4)
@ -486,8 +487,8 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
batt_current, batt_current,
batt_voltage, batt_voltage,
batt_temp_0, batt_temp_0,
batt_temp_1 batt_temp_1,
) = struct.unpack(fmt_str, hk_data[current_idx: current_idx + inc_len]) ) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
current_idx += inc_len current_idx += inc_len
device_types = [] device_types = []
device_statuses = [] device_statuses = []
@ -501,11 +502,15 @@ def handle_p60_hk_data(printer: FsfwTmTcPrinter, hk_data: bytes):
current_idx += 1 current_idx += 1
wdt_reboots_list = [] wdt_reboots_list = []
for idx in range(5): for idx in range(5):
wdt_reboots_list.append(struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]) wdt_reboots_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4 current_idx += 4
time_pings_left_list = [] time_pings_left_list = []
for idx in range(3): for idx in range(3):
time_pings_left_list.append(struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]) time_pings_left_list.append(
struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
)
current_idx += 4 current_idx += 4
for idx in range(2): for idx in range(2):
time_pings_left_list.append(hk_data[current_idx]) time_pings_left_list.append(hk_data[current_idx])

View File

@ -31,6 +31,9 @@ import traceback
try: try:
import tmtccmd.runner as tmtccmd import tmtccmd.runner as tmtccmd
from tmtccmd.config import default_json_path, SetupArgs
from tmtccmd.config.args import create_default_args_parser, add_default_tmtccmd_args, \
parse_default_input_arguments
from tmtccmd.ccsds.handler import CcsdsTmHandler, ApidHandler from tmtccmd.ccsds.handler import CcsdsTmHandler, ApidHandler
from tmtccmd.logging import init_console_logger from tmtccmd.logging import init_console_logger
except ImportError as error: except ImportError as error:
@ -61,21 +64,25 @@ from pus_tm.factory_hook import ccsds_tm_handler
def main(): def main():
print(f"-- eive tmtc version {__version__} --") print(f"-- eive tmtc version {__version__} --")
print(f"-- spacepackets version {spacepackets.__version__} --") print(f"-- spacepackets version {spacepackets.__version__} --")
hook_obj = EiveHookObject() tmtccmd.init_printout(False)
tmtccmd.init_tmtccmd(hook_object=hook_obj) hook_obj = EiveHookObject(json_cfg_path=default_json_path())
arg_parser = create_default_args_parser()
add_default_tmtccmd_args(arg_parser)
args = parse_default_input_arguments(arg_parser, hook_obj)
setup_args = SetupArgs(hook_obj=hook_obj, use_gui=False, apid=PUS_APID, cli_args=args)
apid_handler = ApidHandler(
cb=ccsds_tm_handler, queue_len=50, user_args=None
)
ccsds_handler = CcsdsTmHandler() ccsds_handler = CcsdsTmHandler()
init_console_logger() ccsds_handler.add_tm_handler(apid=PUS_APID, handler=apid_handler)
pus_handler = ApidHandler(cb=ccsds_tm_handler, queue_len=50, user_args=None) tmtccmd.setup(setup_args=setup_args)
ccsds_handler.add_tm_handler(apid=PUS_APID, handler=pus_handler)
tmtccmd.add_ccsds_handler(ccsds_handler) tmtccmd.add_ccsds_handler(ccsds_handler)
tmtccmd.setup_tmtccmd(use_gui=False, reduced_printout=False)
tmtc_backend = tmtccmd.get_default_tmtc_backend( tmtc_backend = tmtccmd.get_default_tmtc_backend(
hook_obj=hook_obj, setup_args=setup_args,
json_cfg_path=hook_obj.get_json_config_file_path(),
tm_handler=ccsds_handler, tm_handler=ccsds_handler,
) )
tmtc_backend.set_pre_send_cb(callable=pre_tc_send_cb, user_args=None) tmtc_backend.set_pre_send_cb(callable=pre_tc_send_cb, user_args=None)
tmtccmd.run_tmtccmd(False, tmtc_backend=tmtc_backend, run_setup=False) tmtccmd.run(tmtc_backend=tmtc_backend)
if __name__ == "__main__": if __name__ == "__main__":