updates for common_tmtc
This commit is contained in:
parent
22ea3eea92
commit
4b6ab8d00a
@ -1,13 +1,14 @@
|
|||||||
import argparse
|
import argparse
|
||||||
from typing import Dict, Tuple, Optional
|
from typing import Dict, Tuple, Optional
|
||||||
|
|
||||||
|
from spacepackets.ecss.conf import PusVersion
|
||||||
|
|
||||||
from tmtccmd.com_if.com_interface_base import CommunicationInterface
|
from tmtccmd.com_if.com_interface_base import CommunicationInterface
|
||||||
from tmtccmd.config.definitions import ServiceOpCodeDictT
|
from tmtccmd.config.definitions import ServiceOpCodeDictT
|
||||||
from tmtccmd.config.hook import TmTcHookBase
|
from tmtccmd.config.hook import TmTcHookBase
|
||||||
from tmtccmd.core.backend import TmTcHandler
|
from tmtccmd.core.backend import TmTcHandler
|
||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.tm.service_3_base import Service3Base
|
from tmtccmd.tm.service_3_base import Service3Base
|
||||||
from tmtccmd.ecss.conf import PusVersion
|
|
||||||
from tmtccmd.utility.tmtc_printer import TmTcPrinter
|
from tmtccmd.utility.tmtc_printer import TmTcPrinter
|
||||||
|
|
||||||
from common_tmtc.config.definitions import PUS_APID
|
from common_tmtc.config.definitions import PUS_APID
|
||||||
@ -25,7 +26,8 @@ class FsfwHookBase(TmTcHookBase):
|
|||||||
def add_globals_pre_args_parsing(self, gui: bool = False):
|
def add_globals_pre_args_parsing(self, gui: bool = False):
|
||||||
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
|
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
|
||||||
set_default_globals_pre_args_parsing(
|
set_default_globals_pre_args_parsing(
|
||||||
gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C, apid=PUS_APID
|
gui=gui, pus_tm_version=PusVersion.PUS_C, pus_tc_version=PusVersion.PUS_C,
|
||||||
|
tc_apid=PUS_APID, tm_apid=PUS_APID
|
||||||
)
|
)
|
||||||
|
|
||||||
def add_globals_post_args_parsing(self, args: argparse.Namespace):
|
def add_globals_post_args_parsing(self, args: argparse.Namespace):
|
||||||
@ -40,7 +42,7 @@ class FsfwHookBase(TmTcHookBase):
|
|||||||
from tmtccmd.config.com_if import create_communication_interface_default
|
from tmtccmd.config.com_if import create_communication_interface_default
|
||||||
return create_communication_interface_default(
|
return create_communication_interface_default(
|
||||||
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
|
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
|
||||||
json_cfg_path=self.get_json_config_file_path(), space_packet_id=0x08ef
|
json_cfg_path=self.get_json_config_file_path(), space_packet_ids=(0x08ef,)
|
||||||
)
|
)
|
||||||
|
|
||||||
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
|
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.tc.service_17_test import pack_service17_ping_command, pack_generic_service17_test
|
from tmtccmd.pus.service_17_test import pack_service_17_ping_command, pack_generic_service17_test
|
||||||
|
|
||||||
|
|
||||||
def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT):
|
def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT):
|
||||||
if op_code == "0":
|
if op_code == "0":
|
||||||
tc_queue.appendleft(pack_service17_ping_command(ssc=init_ssc).pack_command_tuple())
|
tc_queue.appendleft(pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple())
|
||||||
else:
|
else:
|
||||||
pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc)
|
pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc)
|
||||||
|
@ -5,10 +5,11 @@
|
|||||||
@author R. Mueller
|
@author R. Mueller
|
||||||
@date 02.05.2020
|
@date 02.05.2020
|
||||||
"""
|
"""
|
||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
from tmtccmd.config.definitions import QueueCommands
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
from tmtccmd.ecss.tc import PusTelecommand
|
|
||||||
from tmtccmd.tc.packer import TcQueueT
|
from tmtccmd.tc.packer import TcQueueT
|
||||||
from tmtccmd.tc.service_200_mode import pack_mode_data
|
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
|
||||||
|
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
||||||
|
|
||||||
@ -25,25 +26,25 @@ def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
|
|||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
# Set On Mode
|
# Set On Mode
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
|
||||||
mode_data = pack_mode_data(object_id, 1, 0)
|
mode_data = pack_mode_data(object_id, Modes.ON, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
new_ssc += 1
|
new_ssc += 1
|
||||||
# Set Normal mode
|
# Set Normal mode
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
|
||||||
mode_data = pack_mode_data(object_id, 2, 0)
|
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
new_ssc += 1
|
new_ssc += 1
|
||||||
# Set Raw Mode
|
# Set Raw Mode
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
|
||||||
mode_data = pack_mode_data(object_id, 3, 0)
|
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
new_ssc += 1
|
new_ssc += 1
|
||||||
# Set Off Mode
|
# Set Off Mode
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
|
||||||
mode_data = pack_mode_data(object_id, 0, 0)
|
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
new_ssc += 1
|
new_ssc += 1
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
import struct
|
import struct
|
||||||
|
|
||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
from tmtccmd.config.definitions import QueueCommands
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
from tmtccmd.ecss.tc import PusTelecommand
|
|
||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.tc.service_20_parameter import pack_type_and_matrix_data, pack_parameter_id
|
from tmtccmd.tc.service_20_parameter import pack_type_and_matrix_data, pack_parameter_id
|
||||||
from tmtccmd.tc.service_200_mode import pack_mode_data
|
from tmtccmd.tc.service_200_mode import pack_mode_data
|
||||||
|
@ -7,12 +7,14 @@
|
|||||||
"""
|
"""
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
|
from tmtccmd.tc.service_200_mode import Modes
|
||||||
from tmtccmd.config.definitions import QueueCommands
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
from tmtccmd.ecss.tc import PusTelecommand
|
|
||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.tc.service_200_mode import pack_mode_data
|
from tmtccmd.tc.service_200_mode import pack_mode_data
|
||||||
|
|
||||||
from common_tmtc import pus_tc as cmd_data
|
from common_tmtc.pus_tc import command_data as cmd_data
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
||||||
|
|
||||||
|
|
||||||
@ -28,7 +30,7 @@ def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
|
|||||||
object_id = TEST_DEVICE_0_ID # dummy device
|
object_id = TEST_DEVICE_0_ID # dummy device
|
||||||
# Set Raw Mode
|
# Set Raw Mode
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
|
||||||
mode_data = pack_mode_data(object_id, 3, 0)
|
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
new_ssc += 1
|
new_ssc += 1
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
from tmtccmd.config.definitions import QueueCommands
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
from tmtccmd.tc.service_200_mode import pack_mode_data
|
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
|
||||||
from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command
|
from tmtccmd.tc.service_20_parameter import pack_boolean_parameter_command
|
||||||
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command, \
|
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command, \
|
||||||
Srv3Subservice
|
Srv3Subservice
|
||||||
from tmtccmd.ecss.tc import PusTelecommand
|
|
||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
|
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
|
||||||
|
|
||||||
@ -22,7 +23,6 @@ PARAM_ACTIVATE_CHANGING_DATASETS = 4
|
|||||||
|
|
||||||
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
|
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
|
||||||
current_ssc = 3000
|
current_ssc = 3000
|
||||||
# TODO: Import this from config instead
|
|
||||||
device_idx = 0
|
device_idx = 0
|
||||||
if device_idx == 0:
|
if device_idx == 0:
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
@ -90,7 +90,7 @@ def pack_housekeeping_basic_test(
|
|||||||
if enable_normal_mode:
|
if enable_normal_mode:
|
||||||
# Set mode normal so that sets are changed/read regularly
|
# Set mode normal so that sets are changed/read regularly
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
|
||||||
mode_data = pack_mode_data(object_id, 2, 0)
|
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
|
||||||
current_ssc += 1
|
current_ssc += 1
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
@ -140,7 +140,7 @@ def pack_notification_basic_test(tc_queue: TcQueueT, object_id: bytearray, init_
|
|||||||
if enable_normal_mode:
|
if enable_normal_mode:
|
||||||
# Set mode normal so that sets are changed/read regularly
|
# Set mode normal so that sets are changed/read regularly
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
|
||||||
mode_data = pack_mode_data(object_id, 2, 0)
|
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=current_ssc, app_data=mode_data)
|
||||||
current_ssc += 1
|
current_ssc += 1
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
from tmtccmd.config.definitions import QueueCommands
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
from tmtccmd.ecss.tc import PusTelecommand
|
|
||||||
from tmtccmd.tc.definitions import TcQueueT
|
|
||||||
from tmtccmd.tc.service_200_mode import pack_mode_data
|
|
||||||
|
|
||||||
from common_tmtc import pus_tc as cmd_data
|
from tmtccmd.config.definitions import QueueCommands
|
||||||
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
|
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
|
||||||
|
|
||||||
|
import common_tmtc.pus_tc.command_data as cmd_data
|
||||||
|
|
||||||
|
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
||||||
@ -22,13 +23,13 @@ def pack_generic_service_8_test_into(tc_queue: TcQueueT):
|
|||||||
|
|
||||||
# set mode on
|
# set mode on
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
|
||||||
mode_data = pack_mode_data(object_id, 1, 0)
|
mode_data = pack_mode_data(object_id, Modes.ON, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
# set mode normal
|
# set mode normal
|
||||||
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode"))
|
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode"))
|
||||||
mode_data = pack_mode_data(object_id, 2, 0)
|
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
|
||||||
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
|
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
|
||||||
tc_queue.appendleft(command.pack_command_tuple())
|
tc_queue.appendleft(command.pack_command_tuple())
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@ from tmtccmd.utility.logger import get_console_logger
|
|||||||
from tmtccmd.tc.definitions import TcQueueT
|
from tmtccmd.tc.definitions import TcQueueT
|
||||||
from tmtccmd.config.definitions import CoreServiceList
|
from tmtccmd.config.definitions import CoreServiceList
|
||||||
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
|
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
|
||||||
from tmtccmd.tc.service_17_test import pack_generic_service17_test
|
from tmtccmd.pus.service_17_test import pack_generic_service17_test
|
||||||
from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into
|
from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into
|
||||||
|
|
||||||
LOGGER = get_console_logger()
|
LOGGER = get_console_logger()
|
||||||
|
@ -3,15 +3,15 @@
|
|||||||
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
|
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
|
||||||
it to your needs.
|
it to your needs.
|
||||||
"""
|
"""
|
||||||
|
from spacepackets.ecss.tm import PusTelemetry
|
||||||
|
|
||||||
from tmtccmd.ecss.tm import PusTelemetry
|
|
||||||
from tmtccmd.utility.logger import get_console_logger
|
from tmtccmd.utility.logger import get_console_logger
|
||||||
from tmtccmd.tm.service_1_verification import Service1TM
|
from tmtccmd.pus.service_1_verification import Service1TMExtended
|
||||||
from tmtccmd.tm.service_2_raw_cmd import Service2TM
|
from tmtccmd.tm.service_2_raw_cmd import Service2TM
|
||||||
from tmtccmd.tm.service_3_housekeeping import Service3TM
|
from tmtccmd.tm.service_3_housekeeping import Service3TM
|
||||||
from tmtccmd.tm.service_5_event import Service5TM
|
from tmtccmd.tm.service_5_event import Service5TM
|
||||||
from tmtccmd.tm.service_8_functional_cmd import Service8TM
|
from tmtccmd.tm.service_8_functional_cmd import Service8TM
|
||||||
from tmtccmd.tm.service_17_test import Service17TM
|
from tmtccmd.pus.service_17_test import Service17TMExtended
|
||||||
from tmtccmd.tm.service_20_parameters import Service20TM
|
from tmtccmd.tm.service_20_parameters import Service20TM
|
||||||
from tmtccmd.tm.service_200_mode import Service200TM
|
from tmtccmd.tm.service_200_mode import Service200TM
|
||||||
from tmtccmd.utility.tmtc_printer import TmTcPrinter
|
from tmtccmd.utility.tmtc_printer import TmTcPrinter
|
||||||
@ -31,7 +31,7 @@ def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
|
|||||||
service_type = raw_tm_packet[7]
|
service_type = raw_tm_packet[7]
|
||||||
tm_packet = None
|
tm_packet = None
|
||||||
if service_type == 1:
|
if service_type == 1:
|
||||||
tm_packet = Service1TM.unpack(raw_tm_packet)
|
tm_packet = Service1TMExtended.unpack(raw_tm_packet)
|
||||||
if service_type == 2:
|
if service_type == 2:
|
||||||
tm_packet = Service2TM.unpack(raw_tm_packet)
|
tm_packet = Service2TM.unpack(raw_tm_packet)
|
||||||
if service_type == 3:
|
if service_type == 3:
|
||||||
@ -41,7 +41,7 @@ def pus_packet_factory(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
|
|||||||
if service_type == 5:
|
if service_type == 5:
|
||||||
tm_packet = Service5TM.unpack(raw_tm_packet)
|
tm_packet = Service5TM.unpack(raw_tm_packet)
|
||||||
if service_type == 17:
|
if service_type == 17:
|
||||||
tm_packet = Service17TM.unpack(raw_tm_packet)
|
tm_packet = Service17TMExtended.unpack(raw_tm_packet)
|
||||||
if service_type == 20:
|
if service_type == 20:
|
||||||
tm_packet = Service20TM.unpack(raw_tm_packet)
|
tm_packet = Service20TM.unpack(raw_tm_packet)
|
||||||
if service_type == 200:
|
if service_type == 200:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user