fixed merge conflicts

This commit is contained in:
Jakob Meier 2021-06-11 13:51:56 +02:00
commit 58c20e31cc
33 changed files with 183 additions and 194 deletions

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 2 -c 2 --hk" />
<option name="PARAMETERS" value="-m listener --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 17 -t 3" />
<option name="PARAMETERS" value="-s 17 -o 0 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 200 -t 4" />
<option name="PARAMETERS" value="-s 200 -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 2 -t 4" />
<option name="PARAMETERS" value="-s 2 -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 3 -p --hk" />
<option name="PARAMETERS" value="-s 3 -p --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s 8 -t 8" />
<option name="PARAMETERS" value="-s 8 -t 8" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -1,24 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Single Command UDP" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_core/tmtcc_runner.py" />
<option name="PARAMETERS" value="-m listener -c udp" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s acu --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="PARAMETERS" value="-s acu -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s p60dock --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="PARAMETERS" value="-s p60dock -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s pdu1 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l" />
<option name="PARAMETERS" value="-s pdu1 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m seqcmd -c udp -s pdu2 --boardIP 192.168.133.10 --clientIP 192.168.133.4 -l -t 4" />
<option name="PARAMETERS" value="-s pdu2 -l -t 4" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -6,7 +6,7 @@
import enum
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.logger import get_logger
LOGGER = get_logger()

View File

@ -8,6 +8,7 @@ import enum
class CustomServiceList(enum.Enum):
TEST_DEVICE = "test",
P60DOCK = "p60dock"
PDU1 = "pdu1"
PDU2 = "pdu2"
@ -17,3 +18,5 @@ class CustomServiceList(enum.Enum):
HEATER = "heater"
IMTQ = "imtq"
PLOC = "ploc"
PCDU = "pcdu",
SA_DEPLYOMENT = "sa_depl"

View File

@ -11,12 +11,10 @@ import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.definitions import CustomServiceList
from config.custom_mode_op import CustomModeList
from tmtccmd.core.definitions import CoreComInterfaces, CoreServiceList
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing, \
set_default_globals_post_args_parsing, get_core_service_dict
from tmtccmd.core.globals_manager import update_global
from tmtccmd.core.definitions import CoreGlobalIds
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.config.definitions import CoreComInterfaces
from tmtccmd.config.globals import set_default_globals_pre_args_parsing, \
set_default_globals_post_args_parsing
from tmtccmd.utility.logger import get_logger
LOGGER = get_logger()
@ -27,29 +25,11 @@ class CustomGlobalIds(enum.Enum):
def set_globals_pre_args_parsing(gui: bool = False):
set_default_globals_pre_args_parsing(gui=gui, apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP)
servicelist = get_core_service_dict()
update_global(CoreGlobalIds.CURRENT_SERVICE, CoreServiceList.SERVICE_17)
update_global(CoreGlobalIds.SERVICE_DICT, servicelist)
set_default_globals_pre_args_parsing(gui=gui, apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP.value)
def add_globals_post_args_parsing(args: argparse.Namespace):
def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str):
set_default_globals_post_args_parsing(
args=args, custom_services_list=[CustomServiceList],
custom_modes_list=[CustomModeList],
custom_com_ifs_lists=None
custom_modes_list=[CustomModeList], json_cfg_path=json_cfg_path
)
def set_up_serial_cfg(com_if: CoreComInterfaces):
from tmtccmd.defaults.com_setup import default_serial_cfg_setup
default_serial_cfg_setup(com_if=com_if)
def set_up_ethernet_cfg():
from tmtccmd.defaults.com_setup import default_tcpip_udp_cfg_setup
default_tcpip_udp_cfg_setup()

View File

@ -1,18 +1,72 @@
import argparse
from typing import Union, Dict, Tuple
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.core.hook_base import TmTcHookBase
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.config.globals import OpCodeDictKeys
from config.definitions import CustomServiceList
class EiveHookObject(TmTcHookBase):
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict()
op_code_dict_srv_acu = {
"0": ("ACU Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_acu_tuple = ("ACU Devices", op_code_dict_srv_acu)
op_code_dict_srv_tmp1075 = {
"0": ("TMP1075 Tests", {OpCodeDictKeys.TIMEOUT: 2.2}),
}
service_tmp1075_1_tuple = ("TMP1075 1", op_code_dict_srv_tmp1075)
service_tmp1075_2_tuple = ("TMP1075 2", op_code_dict_srv_tmp1075)
op_code_dict_srv_p60 = {
"0": ("P60 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_p60_tuple = ("P60 Device", op_code_dict_srv_p60)
op_code_dict_srv_pdu1 = {
"0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1)
op_code_dict_srv_pdu2 = {
"0": ("PDU2 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2)
op_code_dict_srv_heater = {
"0": ("Heater Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_heater_tuple = ("Heater Device", op_code_dict_srv_heater)
service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple
service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple
service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple
service_op_code_dict[CustomServiceList.P60DOCK.value] = service_p60_tuple
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu1_tuple
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple
service_op_code_dict[CustomServiceList.HEATER.value] = service_heater_tuple
return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""
The user can specify a path and filename for the JSON configuration file by overriding this function.
:return:
"""
return "config/tmtc_config.json"
def get_version(self) -> str:
from config.version import SW_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_SUBMINOR
return f"{SW_NAME} {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_SUBMINOR}"
@ -23,12 +77,14 @@ class EiveHookObject(TmTcHookBase):
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args)
add_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path()
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
@ -42,21 +98,13 @@ class EiveHookObject(TmTcHookBase):
from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def set_object_ids(self) -> Dict[int, bytearray]:
from config.object_ids import set_object_ids
return set_object_ids()
def pack_total_service_queue(self) -> Union[None, TcQueueT]:
from pus_tc.tc_packer_hook import create_total_tc_queue_user
return create_total_tc_queue_user()
def command_preparation_hook(self) -> Union[None, PusTelecommand]:
from custom_hooks import command_preparation_hook
return command_preparation_hook()
def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: int, action_id: int, custom_data: bytearray
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(

View File

@ -3,55 +3,36 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
from typing import Dict
PUS_SERVICE_17_ID = bytearray([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_ID = bytearray([0x44, 0x00, 0xAF, 0xFE])
P60_DOCK_HANDLER = bytearray([0x44, 0x00, 0x00, 0x1])
PDU_1_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x2])
PDU_2_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x3])
ACU_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x4])
TMP_1075_1_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x5])
TMP_1075_2_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x6])
HEATER_ID = bytearray([0x54, 0x00, 0x00, 0x1])
PCDU_HANDLER_ID = bytearray([0x44, 0x00, 0x10, 0x00])
SOLAR_ARRAY_DEPLOYMENT_ID = bytearray([0x44, 0x00, 0x10, 0x01])
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_ID = bytes([0x44, 0x00, 0xAF, 0xFE])
P60_DOCK_HANDLER = bytes([0x44, 0x00, 0x00, 0x1])
PDU_1_HANDLER_ID = bytes([0x44, 0x00, 0x00, 0x2])
PDU_2_HANDLER_ID = bytes([0x44, 0x00, 0x00, 0x3])
ACU_HANDLER_ID = bytes([0x44, 0x00, 0x00, 0x4])
TMP_1075_1_HANDLER_ID = bytes([0x44, 0x00, 0x00, 0x5])
TMP_1075_2_HANDLER_ID = bytes([0x44, 0x00, 0x00, 0x6])
HEATER_ID = bytes([0x54, 0x00, 0x00, 0x1])
PCDU_HANDLER_ID = bytes([0x44, 0x00, 0x10, 0x00])
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x00, 0x10, 0x01])
SYRLINKS_HANDLER = bytes([0x44, 0x00, 0x10, 0x02])
IMTQ_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x14])
PLOC_ID = bytearray([0x44, 0x00, 0x00, 0x15])
class ObjIdIds(enum.IntEnum):
PUS_SERVICE_17_ID = 0
TEST_DEVICE_ID = 1
P60DOCK_HANDLER_ID = 2
PDU1_HANDLER_ID = 3
PDU2_HANDLER_ID = 4
PCDU_HANDLER_ID = 5
ACU_HANDLER_ID = 6
TMP1075_1_HANDLER_ID = 7
TMP1075_2_HANDLER_ID = 8
HEATER_ID = 9
SOLAR_ARRAY_DEPLOYMENT_ID = 10
IMTQ_HANDLER_ID = 11
PLOC_ID = 12
def set_object_ids() -> Dict[int, bytearray]:
def get_object_ids() -> Dict[bytes, list]:
object_id_dict = ({
ObjIdIds.PUS_SERVICE_17_ID: PUS_SERVICE_17_ID,
ObjIdIds.TEST_DEVICE_ID: TEST_DEVICE_ID,
ObjIdIds.P60DOCK_HANDLER_ID: P60_DOCK_HANDLER,
ObjIdIds.PDU1_HANDLER_ID: PDU_1_HANDLER_ID,
ObjIdIds.PDU2_HANDLER_ID: PDU_2_HANDLER_ID,
ObjIdIds.ACU_HANDLER_ID: ACU_HANDLER_ID,
ObjIdIds.TMP1075_1_HANDLER_ID: TMP_1075_1_HANDLER_ID,
ObjIdIds.TMP1075_2_HANDLER_ID: TMP_1075_2_HANDLER_ID,
ObjIdIds.HEATER_ID: HEATER_ID,
ObjIdIds.PCDU_HANDLER_ID: PCDU_HANDLER_ID,
ObjIdIds.SOLAR_ARRAY_DEPLOYMENT_ID: SOLAR_ARRAY_DEPLOYMENT_ID,
ObjIdIds.IMTQ_HANDLER_ID: IMTQ_HANDLER_ID,
ObjIdIds.PLOC_ID: PLOC_ID
PUS_SERVICE_17_ID: "PUS Service 17",
TEST_DEVICE_ID: "Test Device",
P60_DOCK_HANDLER: "P60",
PDU_1_HANDLER_ID: "PCDU PDU1 Handler",
PDU_2_HANDLER_ID: "PCDU PDU2 Handler",
ACU_HANDLER_ID: "ACU Handler",
TMP_1075_1_HANDLER_ID: "TMP 1075 Handler 1",
TMP_1075_2_HANDLER_ID: "TMP 1075 Handler 2",
HEATER_ID: "Heater",
PCDU_HANDLER_ID: "PCDU",
SOLAR_ARRAY_DEPLOYMENT_ID: "Solar Array Deployment",
})
return object_id_dict

View File

@ -1,4 +1,4 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
VERSION_MINOR = 4
VERSION_MINOR = 5
VERSION_SUBMINOR = 0

View File

@ -8,11 +8,11 @@
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from gomspace.gomspace_common import *
from pus_tc.p60dock import P60DockConfigTable
from config.object_ids import ObjIdIds
from tmtccmd.core.object_id_manager import get_object_id
from config.object_ids import P60_DOCK_HANDLER
class ACUTestProcedure:
@ -57,7 +57,7 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing ACU"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling ACU connected to X1 slot (channel 0)"))
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
p60dock_object_id = P60_DOCK_HANDLER
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)

View File

@ -5,7 +5,7 @@
@author J. Meier
@date 30.01.2021
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand

View File

@ -5,7 +5,7 @@
@author J. Meier
@date 13.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *

View File

@ -5,7 +5,7 @@
@author J. Meier
@date 17.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand

View File

@ -5,7 +5,7 @@
@author J. Meier
@date 17.12.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *

View File

@ -5,7 +5,7 @@
@author R. Mueller
@date 02.05.2020
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.pus_tc.service_200_mode import pack_mode_data

View File

@ -1,20 +1,20 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_solar_array_deployment.py
@file solar_array_deployment.py
@brief The test function in this file simply returns a command which triggers the solar array deployment.
@author J. Meier
@date 15.02.2021
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.packer import PusTelecommand
class ActionIds:
DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5])
def pack_solar_array_deployment_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
def pack_solar_array_deployment_test_into(object_id: bytearray, tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing S/A Deployment"))
command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS

View File

@ -1,15 +1,14 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_syrlinks_hk_handler.py
@file syrlinks_hk_handler.py
@brief Syrlinks Hk Handler tests
@author J. Meier
@date 13.12.2020
"""
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service_3_housekeeping import *
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command
class SetIds:
@ -17,7 +16,7 @@ class SetIds:
TX_REGISTERS_DATASET = 2
def pack_syrlinks_hk_handler_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
def pack_syrlinks_hk_handler_test_into(object_id: bytearray, tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Get RX Registers"))
sid = make_sid(object_id, SetIds.RX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 200)
@ -27,5 +26,3 @@ def pack_syrlinks_hk_handler_test_into(object_id: bytearray, tc_queue: TcQueueT)
sid = make_sid(object_id, SetIds.TX_REGISTERS_DATASET)
command = generate_one_hk_command(sid, 201)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

View File

@ -8,12 +8,11 @@ import os
from collections import deque
from typing import Union
from tmtccmd.core.definitions import CoreServiceList
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.pus_tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command
from tmtccmd.core.object_id_manager import get_object_id
from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_test_into
@ -24,46 +23,52 @@ from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.ploc import pack_ploc_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
from config.object_ids import ObjIdIds
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_ID
LOGGER = get_logger()
def pack_service_queue_user(service: Union[str, int], op_code: str, service_queue: TcQueueT):
if service == CoreServiceList.SERVICE_5:
return pack_generic_service5_test_into(service_queue)
if service == CoreServiceList.SERVICE_17:
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=service_queue)
if service == CoreServiceList.SERVICE_17.value:
return service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
if service == CustomServiceList.P60DOCK.value:
object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_p60dock_test_into(object_id, service_queue)
object_id = P60_DOCK_HANDLER
return pack_p60dock_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.PDU1.value:
pdu1_object_id = get_object_id(ObjIdIds.PDU1_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_pdu1_test_into(pdu1_object_id, p60dock_object_id, service_queue)
pdu1_object_id = PDU_1_HANDLER_ID
p60dock_object_id = P60_DOCK_HANDLER
return pack_pdu1_test_into(
pdu1_object_id=pdu1_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue
)
if service == CustomServiceList.PDU2.value:
pdu2_object_id = get_object_id(ObjIdIds.PDU2_HANDLER_ID)
p60dock_object_id = get_object_id(ObjIdIds.P60DOCK_HANDLER_ID)
return pack_pdu2_test_into(pdu2_object_id, p60dock_object_id, service_queue)
pdu2_object_id = PDU_2_HANDLER_ID
p60dock_object_id = P60_DOCK_HANDLER
return pack_pdu2_test_into(
pdu2_object_id=pdu2_object_id, p60dock_object_id=p60dock_object_id, tc_queue=service_queue
)
if service == CustomServiceList.ACU.value:
object_id = get_object_id(ObjIdIds.ACU_HANDLER_ID)
return pack_acu_test_into(object_id, service_queue)
object_id = ACU_HANDLER_ID
return pack_acu_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.TMP1075_1.value:
object_id = get_object_id(ObjIdIds.TMP1075_1_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
object_id = TMP_1075_1_HANDLER_ID
return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TMP1075_2.value:
object_id = get_object_id(ObjIdIds.TMP1075_2_HANDLER_ID)
return pack_tmp1075_test_into(object_id, service_queue)
object_id = TMP_1075_2_HANDLER_ID
return pack_tmp1075_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.HEATER.value:
object_id = get_object_id(ObjIdIds.HEATER_ID)
return pack_heater_test_into(object_id, service_queue)
object_id = HEATER_ID
return pack_heater_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.IMTQ.value:
object_id = get_object_id(ObjIdIds.IMTQ_HANDLER_ID)
return pack_imtq_test_into(object_id, service_queue)
object_id = IMTQ_HANDLER_ID
return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.PLOC.value:
object_id = get_object_id(ObjIdIds.PLOC_ID)
return pack_ploc_test_into(object_id, service_queue)
object_id = PLOC_ID
return pack_ploc_test_into(object_id=object_id, tc_queue=service_queue)
LOGGER.warning("Invalid Service !")

View File

@ -5,7 +5,7 @@
@author J. Meier
@date 06.01.2021
"""
from tmtccmd.core.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
@ -30,7 +30,7 @@ class Tmp1075ActionIds:
start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02])
def pack_tmp1075_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
def pack_tmp1075_test_into(object_id: bytearray, op_code: str, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing Tmp1075 Temperature Sensor Handler with object id: 0x" + object_id.hex())

View File

@ -5,7 +5,7 @@
"""
from tmtccmd.pus_tm.service_8_functional_cmd import Service8TM
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tm.service_1_verification import Service1TM
from tmtccmd.pus_tm.service_3_base import Service3Base

View File

@ -7,7 +7,7 @@
from typing import Tuple
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.logger import get_logger
LOGGER = get_logger()

View File

@ -7,16 +7,14 @@
import struct
from typing import Tuple
from config.tmtcc_object_ids import ObjectIds
from tmtc_core.pus_tm.tmtcc_tm_service3_base import Service3Base
from tmtc_core.utility.tmtcc_logger import get_logger
from config.tmtcc_object_ids import ObjectIds
from pus_tc.tmtcc_tc_syrlinks_hk_handler import SetIds
from tmtccmd.pus_tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_logger
from pus_tc.syrlinks_hk_handler import SetIds
from config.object_ids import SYRLINKS_HANDLER
LOGGER = get_logger()
def handle_user_hk_packet(object_id: ObjectIds, set_id: int, hk_data: bytearray,
def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
@ -37,7 +35,7 @@ def handle_user_hk_packet(object_id: ObjectIds, set_id: int, hk_data: bytearray,
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
if object_id == ObjectIds.SYRLINKS_HK_HANDLER:
if object_id == SYRLINKS_HANDLER:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:

View File

@ -1,12 +1,12 @@
import struct
from typing import Tuple
from config.object_ids import ObjIdIds
from config.object_ids import *
from pus_tc.imtq import ImtqActionIds
from pus_tc.ploc import PlocReplyIds
def user_analyze_service_8_data(
object_id: int, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
object_id: bytes, action_id: int, custom_data: bytearray) -> Tuple[list, list]:
"""
This function is called by the TMTC core if a Service 8 data reply (subservice 130)
is received. The user can return a tuple of two lists, where the first list
@ -18,7 +18,7 @@ def user_analyze_service_8_data(
@param custom_data:
@return:
"""
if object_id == ObjIdIds.PDU2_HANDLER_ID.value:
if object_id == PDU_2_HANDLER_ID:
header_list = ['PDU2 Service 8 Reply']
data_string = str()
@ -28,9 +28,9 @@ def user_analyze_service_8_data(
data_string = data_string.rstrip(',')
data_string = data_string.rstrip()
content_list = [data_string]
elif object_id == ObjIdIds.IMTQ_HANDLER_ID:
elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data)
elif object_id == ObjIdIds.PLOC_ID:
elif object_id == PLOC_ID:
return handle_ploc_replies(action_id, custom_data)
else:
header_list = []

@ -1 +1 @@
Subproject commit 1773f62856f6c430c380f4c256d54a7a0b8f0a75
Subproject commit 3f39a1ffa17e8c74cadc789b3f4714c4bbd5f75f

View File

@ -5,6 +5,7 @@ def parse_input_arguments_user(print_known_args: bool = False, print_unknown_arg
This function by default will build the default argument parser. A custom CLI parser can be
built in this function if required.
"""
from tmtccmd.defaults.args_parser import parse_default_input_arguments
parse_default_input_arguments(print_known_args=print_known_args,
print_unknown_args=print_unknown_args)
from tmtccmd.config.args import parse_default_input_arguments
parse_default_input_arguments(
print_known_args=print_known_args, print_unknown_args=print_unknown_args
)