Merge pull request 'Update to v1.6.1' (#5) from develop into master

Reviewed-on: eive/eive_tmtc#5
This commit is contained in:
Robin Müller 2021-06-21 17:41:58 +02:00
commit 0eb6736692
20 changed files with 589 additions and 121 deletions

10
.gitignore vendored
View File

@ -4,4 +4,12 @@ log
.idea/*
!.idea/runConfigurations
*.json
*.json
/Lib
/Scripts
/pyvenv.cfg
/bin
/lib
/lib64
/share

View File

@ -1,8 +1,46 @@
### How to use this folder
# TMTC Commander EIVE
This folder contains template files to set up the TMTC commander
for a new mission or project. These files are the adaption
point to customize the TMTC commander.
# Set up virtual environment
## Linux
1. Create virtual environment
```sh
python3 -m venv .
```
2. Activate virtual environment
```sh
./Scripts/activate
```
3. Install `tmtccmd` for virtual environment. `-e` for interactive installation.
```sh
cd tmtccmd
python3 -m pip install -e .[gui]
```
## Windows
1. Create virtual environment
```sh
py -m venv .
```
2. Activate virtual environment
```sh
Scripts\activate.bat
```
3. Install `tmtccmd` for virtual environment. `-e` for interactive installation.
```sh
cd tmtccmd
py -m pip install -e .[gui]
```
To do so, simply copy all folder inside the TMTC commander root. This
step is also required because the TMTC commander core will load some modules.

View File

@ -7,6 +7,9 @@
import enum
PUS_APID = 0x65
class CustomServiceList(enum.Enum):
TEST_DEVICE = "test",
P60DOCK = "p60dock"
@ -15,6 +18,8 @@ class CustomServiceList(enum.Enum):
ACU = "acu"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater",
HEATER = "heater"
IMTQ = "imtq"
PLOC = "ploc"
PCDU = "pcdu",
SA_DEPLYOMENT = "sa_depl"

View File

@ -9,7 +9,7 @@ import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.definitions import CustomServiceList
from config.definitions import CustomServiceList, PUS_APID
from config.custom_mode_op import CustomModeList
from tmtccmd.config.definitions import CoreComInterfaces
from tmtccmd.config.globals import set_default_globals_pre_args_parsing, \
@ -25,7 +25,8 @@ class CustomGlobalIds(enum.Enum):
def set_globals_pre_args_parsing(gui: bool = False):
set_default_globals_pre_args_parsing(gui=gui, apid=0x65, com_if_id=CoreComInterfaces.TCPIP_UDP.value)
set_default_globals_pre_args_parsing(
gui=gui, apid=PUS_APID, com_if_id=CoreComInterfaces.TCPIP_UDP.value)
def add_globals_post_args_parsing(args: argparse.Namespace, json_cfg_path: str):

View File

@ -50,6 +50,19 @@ class EiveHookObject(TmTcHookBase):
}
service_heater_tuple = ("Heater Device", op_code_dict_srv_heater)
op_code_dict_srv_imtq = {
"0": ("IMTQ Tests All", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq)
service_op_code_dict[CustomServiceList.ACU.value] = service_acu_tuple
service_op_code_dict[CustomServiceList.TMP1075_1.value] = service_tmp1075_1_tuple
service_op_code_dict[CustomServiceList.TMP1075_2.value] = service_tmp1075_2_tuple
@ -58,6 +71,7 @@ class EiveHookObject(TmTcHookBase):
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple
service_op_code_dict[CustomServiceList.PDU1.value] = service_pdu2_tuple
service_op_code_dict[CustomServiceList.HEATER.value] = service_heater_tuple
service_op_code_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple
return service_op_code_dict
def get_json_config_file_path(self) -> str:
@ -83,7 +97,8 @@ class EiveHookObject(TmTcHookBase):
Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path()
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path()
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
@ -113,7 +128,7 @@ class EiveHookObject(TmTcHookBase):
@staticmethod
def handle_service_3_housekeeping(
object_id: int, set_id: int, hk_data: bytearray, service3_packet: Service3Base
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(

View File

@ -17,6 +17,8 @@ HEATER_ID = bytes([0x54, 0x00, 0x00, 0x1])
PCDU_HANDLER_ID = bytes([0x44, 0x00, 0x10, 0x00])
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x00, 0x10, 0x01])
SYRLINKS_HANDLER = bytes([0x44, 0x00, 0x10, 0x02])
IMTQ_HANDLER_ID = bytearray([0x44, 0x00, 0x00, 0x14])
PLOC_ID = bytearray([0x44, 0x00, 0x00, 0x15])
def get_object_ids() -> Dict[bytes, list]:

View File

@ -1,4 +1,4 @@
SW_NAME = "eive"
VERSION_MAJOR = 1
VERSION_MINOR = 5
VERSION_SUBMINOR = 0
VERSION_MINOR = 6
VERSION_SUBMINOR = 1

View File

@ -6,6 +6,11 @@ class PDUConfigTable:
out_en_1 = TableEntry(bytearray([0x00, 0x49]), TableEntry.uint8_size)
out_en_2 = TableEntry(bytearray([0x00, 0x4A]), TableEntry.uint8_size)
out_en_3 = TableEntry(bytearray([0x00, 0x4B]), TableEntry.uint8_size)
out_en_4 = TableEntry(bytearray([0x00, 0x4C]), TableEntry.uint8_size)
out_en_5 = TableEntry(bytearray([0x00, 0x4D]), TableEntry.uint8_size)
out_en_6 = TableEntry(bytearray([0x00, 0x4E]), TableEntry.uint8_size)
out_en_7 = TableEntry(bytearray([0x00, 0x4F]), TableEntry.uint8_size)
out_en_8 = TableEntry(bytearray([0x00, 0x50]), TableEntry.uint8_size)
# When channel consumes more than cur_lu_lim, channel is turned of immediately
cur_lu_lim_0 = TableEntry(bytearray([0x00, 0xB8]), TableEntry.uint16_size)

196
pus_tc/imtq.py Normal file
View File

@ -0,0 +1,196 @@
# -*- coding: utf-8 -*-
"""
@file imtq.py
@brief Tests for the ISIS IMTQ (Magnettorquer) device handler
@author J. Meier
@date 25.03.2021
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tc.service_3_housekeeping import make_sid, generate_one_hk_command
class ImtqTestProcedure:
"""
@brief Use this class to define the tests to perform for the IMTQ Handler.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
command_dipole = False
get_commanded_dipole = False
positive_x_test = True
negative_x_test = False
positive_y_test = False
negative_y_test = False
positive_z_test = False
negative_z_test = False
class ImtqSetIds:
ENG_HK_SET = 1
CAL_MTM_SET = 2
RAW_MTM_SET = 3
POSITIVE_X_TEST = 4
NEGATIVE_X_TEST = 5
POSITIVE_Y_TEST = 6
NEGATIVE_Y_TEST = 7
POSITIVE_Z_TEST = 8
NEGATIVE_Z_TEST = 9
class ImtqActionIds:
start_actuation_dipole = bytearray([0x0, 0x0, 0x0, 0x02])
get_commanded_dipole = bytearray([0x0, 0x0, 0x0, 0x03])
perform_positive_x_test = bytearray([0x0, 0x0, 0x0, 0x07])
perform_negative_x_test = bytearray([0x0, 0x0, 0x0, 0x08])
perform_positive_y_test = bytearray([0x0, 0x0, 0x0, 0x09])
perform_negative_y_test = bytearray([0x0, 0x0, 0x0, 0x0A])
perform_positive_z_test = bytearray([0x0, 0x0, 0x0, 0x0B])
perform_negative_z_test = bytearray([0x0, 0x0, 0x0, 0x0C])
# Initiates the reading of the last performed self test. After sending this command the results can be downlinked
# via the housekeeping service by using the appropriate set ids listed above.
read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D])
def pack_imtq_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing ISIS IMTQ handler with object id: 0x" + object_id.hex())
)
if op_code == "0" or op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive x self test"))
command = object_id + ImtqActionIds.perform_positive_x_test
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of positive x self test results"))
command = object_id + ImtqActionIds.read_self_test_results
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with positive x self test results"))
sid = make_sid(object_id, ImtqSetIds.POSITIVE_X_TEST)
command = generate_one_hk_command(sid, 24)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative x self test"))
command = object_id + ImtqActionIds.perform_negative_x_test
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of negative x self test results"))
command = object_id + ImtqActionIds.read_self_test_results
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with negative x self test results"))
sid = make_sid(object_id, ImtqSetIds.NEGATIVE_X_TEST)
command = generate_one_hk_command(sid, 27)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive y self test"))
command = object_id + ImtqActionIds.perform_positive_y_test
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of positive y self test results"))
command = object_id + ImtqActionIds.read_self_test_results
command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with positive y self test results"))
sid = make_sid(object_id, ImtqSetIds.POSITIVE_Y_TEST)
command = generate_one_hk_command(sid, 30)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative y self test"))
command = object_id + ImtqActionIds.perform_negative_y_test
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of negative y self test results"))
command = object_id + ImtqActionIds.read_self_test_results
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with negative y self test results"))
sid = make_sid(object_id, ImtqSetIds.NEGATIVE_Y_TEST)
command = generate_one_hk_command(sid, 33)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform positive z self test"))
command = object_id + ImtqActionIds.perform_positive_z_test
command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of positive z self test results"))
command = object_id + ImtqActionIds.read_self_test_results
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with positive z self test results"))
sid = make_sid(object_id, ImtqSetIds.POSITIVE_Y_TEST)
command = generate_one_hk_command(sid, 36)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "6":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Perform negative z self test"))
command = object_id + ImtqActionIds.perform_negative_z_test
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Initiate reading of negative z self test results"))
command = object_id + ImtqActionIds.read_self_test_results
command = PusTelecommand(service=8, subservice=128, ssc=36, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Request dataset with negative z self test results"))
sid = make_sid(object_id, ImtqSetIds.NEGATIVE_Z_TEST)
command = generate_one_hk_command(sid, 37)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "7":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Commanding dipole"))
x_dipole = 0
y_dipole = 0
z_dipole = 0
duration = 0 # ms
command = pack_dipole_command(object_id, x_dipole, y_dipole, z_dipole, duration)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "0" or op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "IMTQ: Get commanded dipole"))
command = object_id + ImtqActionIds.get_commanded_dipole
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
def pack_dipole_command(object_id: bytearray, x_dipole: int, y_dipole: int, z_dipole: int, duration: int) -> bytearray:
""" This function packs the command causing the ISIS IMTQ to generate a dipole.
@param object_id The object id of the gomspace device handler.
@param x_dipole The dipole of the x coil in 10^-4*Am^2 (max. 2000)
@param y_dipole The dipole of the y coil in 10^-4*Am^2 (max. 2000)
@param z_dipole The dipole of the z coil in 10^-4*Am^2 (max. 2000)
@param duration The duration in milliseconds the dipole will be generated by the coils.
When set to 0, the dipole will be generated until a new dipole actuation
command is sent.
"""
action_id = ImtqActionIds.start_actuation_dipole
command = bytearray()
command = object_id + action_id
command.extend(x_dipole.to_bytes(length=2, byteorder='big'))
command.extend(y_dipole.to_bytes(length=2, byteorder='big'))
command.extend(z_dipole.to_bytes(length=2, byteorder='big'))
command.extend(duration.to_bytes(length=2, byteorder='big'))
return command

View File

@ -24,6 +24,10 @@ class PDU1TestProcedure:
reboot = False
ping = False
read_temperature = False
turn_channel_2_on = False # Star Tracker connected to this channel (5V)
turn_channel_2_off = False
turn_channel_3_on = False # MTQ connected to this channel (5V)
turn_channel_3_off = True
def pack_pdu1_test_into(
@ -31,14 +35,6 @@ def pack_pdu1_test_into(
):
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU1"))
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Enabling PDU1"))
command = pack_set_param_command(
p60dock_object_id, P60DockConfigTable.out_en_1.parameter_address,
P60DockConfigTable.out_en_1.parameter_size, Channel.on
)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
@ -53,3 +49,27 @@ def pack_pdu1_test_into(
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)"))
command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)"))
command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)"))
command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)"))
command = pack_set_param_command(pdu1_object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -101,4 +101,5 @@ def pack_pdu2_test_into(pdu2_object_id: bytearray, p60dock_object_id: bytearray,
command = pack_request_full_hk_table_command(pdu2_object_id)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

67
pus_tc/ploc.py Normal file
View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
@file ploc.py
@brief TMP1075 tests
@author J. Meier
@date 06.01.2021
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.pus_tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
class PlocTestProcedure:
"""
@brief Use this class to define the tests to perform for the PLOC.
@details Setting all to True will run all tests.
Setting all to False will only run the tests set to True.
"""
all = False
test_tc_mem_write = False
test_tc_mem_read = True
class PlocActionIds:
tc_mem_write = bytearray([0x0, 0x0, 0x0, 0x1])
tc_mem_read = bytearray([0x0, 0x0, 0x0, 0x2])
class PlocReplyIds:
tm_mem_read_report = 6
def pack_ploc_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(
(QueueCommands.PRINT,
"Testing PLOC Handler with object id: 0x" + object_id.hex())
)
if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_write:
tc_queue.appendleft((QueueCommands.PRINT, "PLOC: TC Mem Write Test"))
memory_address = int(input("PLOC Tc Mem Write: Type memory address: 0x"), 16)
memory_data = int(input("PLOC Tc Mem Write: Type memory data: 0x"), 16)
command = generate_write_mem_command(object_id, struct.pack('!I', memory_address), memory_data)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PlocTestProcedure.all or PlocTestProcedure.test_tc_mem_read:
tc_queue.appendleft((QueueCommands.PRINT, "PLOC: TC Mem Read Test"))
memory_address = int(input("PLOC Tc Mem Read: Type memory address: 0x"), 16)
command = object_id + PlocActionIds.tc_mem_read + struct.pack('!I', memory_address)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
def generate_write_mem_command(object_id: bytearray, memory_address: bytearray, memory_data: int) -> bytearray:
""" This function generates the command to write to a memory address within the PLOC
@param object_id The object id of the PlocHandler
@param memory_address The PLOC memory address where to write to.
@param memory_data The data to write to the memory address specified by the bytearray memory_address.
"""
command = object_id + PlocActionIds.tc_mem_write + memory_address + struct.pack('!I', memory_data)
return command

View File

@ -18,11 +18,13 @@ from pus_tc.p60dock import pack_p60dock_test_into
from pus_tc.pdu2 import pack_pdu2_test_into
from pus_tc.pdu1 import pack_pdu1_test_into
from pus_tc.acu import pack_acu_test_into
from pus_tc.imtq import pack_imtq_test_into
from pus_tc.tmp1075 import pack_tmp1075_test_into
from pus_tc.ploc import pack_ploc_test_into
from pus_tc.heater import pack_heater_test_into
from config.definitions import CustomServiceList
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_ID
LOGGER = get_logger()
@ -60,6 +62,13 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
if service == CustomServiceList.HEATER.value:
object_id = HEATER_ID
return pack_heater_test_into(object_id=object_id, tc_queue=service_queue)
if service == CustomServiceList.IMTQ.value:
object_id = IMTQ_HANDLER_ID
return pack_imtq_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PLOC.value:
object_id = PLOC_ID
return pack_ploc_test_into(object_id=object_id, tc_queue=service_queue)
LOGGER.warning("Invalid Service !")

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_tmp1075.py
@file tmp1075.py
@brief TMP1075 tests
@author J. Meier
@date 06.01.2021

View File

@ -8,25 +8,35 @@ from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.utility.logger import get_logger
from tmtccmd.pus_tm.service_1_verification import Service1TM
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.pus_tm.service_3_housekeeping import Service3TM
from tmtccmd.pus_tm.service_5_event import Service5TM
from tmtccmd.pus_tm.service_17_test import Service17TM
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from config.definitions import PUS_APID
LOGGER = get_logger()
def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry:
def ccsds_tm_handler(apid: int, raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter) -> None:
if apid == PUS_APID:
pus_factory_hook(raw_tm_packet=raw_tm_packet, tmtc_printer=tmtc_printer)
def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
service_type = raw_tm_packet[7]
tm_packet = None
if service_type == 1:
return Service1TM(raw_tm_packet)
tm_packet = Service1TM(raw_tm_packet)
if service_type == 3:
return Service3Base(raw_tm_packet)
tm_packet = Service3TM(raw_tm_packet)
if service_type == 5:
return Service5TM(raw_tm_packet)
tm_packet = Service5TM(raw_tm_packet)
if service_type == 8:
service_8_tm = Service8TM(raw_tm_packet)
return Service8TM(raw_tm_packet)
tm_packet = Service8TM(raw_tm_packet)
if service_type == 17:
return Service17TM(raw_tm_packet)
LOGGER.info("The service " + str(service_type) + " is not implemented in Telemetry Factory")
return PusTelemetry(raw_tm_packet)
tm_packet = Service17TM(raw_tm_packet)
if tm_packet is None:
LOGGER.info(f'The service {service_type} is not implemented in Telemetry Factory')
tm_packet = PusTelemetry(raw_tm_packet)
tmtc_printer.print_telemetry(packet=tm_packet)

View File

@ -4,19 +4,29 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import struct
from typing import Tuple
from tmtccmd.pus_tm.service_3_base import Service3Base
from tmtccmd.pus_tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_logger
from pus_tc.syrlinks_hk_handler import SetIds
from pus_tc.imtq import ImtqSetIds
from config.object_ids import SYRLINKS_HANDLER, IMTQ_HANDLER_ID
LOGGER = get_logger()
def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray,
def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@ -26,5 +36,122 @@ def handle_user_hk_packet(object_id: int, set_id: int, hk_data: bytearray,
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
if object_id == SYRLINKS_HANDLER:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
elif object_id == IMTQ_HANDLER_ID:
if (set_id >= ImtqSetIds.POSITIVE_X_TEST) and (set_id <= ImtqSetIds.NEGATIVE_Z_TEST):
return handle_self_test_data(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb",
"RX Demod N0", "RX Datarate"]
rx_status = hk_data[0]
rx_sensitivity = struct.unpack('!I', hk_data[1:5])
rx_frequency_shift = struct.unpack('!I', hk_data[5:9])
rx_iq_power = struct.unpack('!H', hk_data[9:11])
rx_agc_value = struct.unpack('!H', hk_data[11:13])
rx_demod_eb = struct.unpack('!I', hk_data[13:17])
rx_demod_n0 = struct.unpack('!I', hk_data[17:21])
rx_data_rate = hk_data[21]
hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0,
rx_data_rate]
return hk_header, hk_content, validity_buffer, 8
def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack('!H', hk_data[2:4])
hk_content = [tx_status, tx_waveform, tx_agc_value]
return hk_header, hk_content, validity_buffer, 3
def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]",
"Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]",
"Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]",
"Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]",
"Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]",
"Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]",
"Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]",
"Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]"]
# INIT step (no coil actuation)
init_err = hk_data[0]
init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0]
init_raw_mag_y = struct.unpack('!f', hk_data[5:9])[0]
init_raw_mag_z = struct.unpack('!f', hk_data[9:13])[0]
init_cal_mag_x = struct.unpack('!f', hk_data[13:17])[0]
init_cal_mag_y = struct.unpack('!f', hk_data[17:21])[0]
init_cal_mag_z = struct.unpack('!f', hk_data[21:25])[0]
init_coil_x_current = struct.unpack('!f', hk_data[25:29])[0]
init_coil_y_current = struct.unpack('!f', hk_data[29:33])[0]
init_coil_z_current = struct.unpack('!f', hk_data[33:37])[0]
init_coil_x_temperature = struct.unpack('!H', hk_data[37:39])[0]
init_coil_y_temperature = struct.unpack('!H', hk_data[39:41])[0]
init_coil_z_temperature = struct.unpack('!H', hk_data[41:43])[0]
# Actuation step
err = hk_data[43]
raw_mag_x = struct.unpack('!f', hk_data[44:48])[0]
raw_mag_y = struct.unpack('!f', hk_data[48:52])[0]
raw_mag_z = struct.unpack('!f', hk_data[52:56])[0]
cal_mag_x = struct.unpack('!f', hk_data[56:60])[0]
cal_mag_y = struct.unpack('!f', hk_data[60:64])[0]
cal_mag_z = struct.unpack('!f', hk_data[64:68])[0]
coil_x_current = struct.unpack('!f', hk_data[68:72])[0]
coil_y_current = struct.unpack('!f', hk_data[72:76])[0]
coil_z_current = struct.unpack('!f', hk_data[76:80])[0]
coil_x_temperature = struct.unpack('!H', hk_data[80:82])[0]
coil_y_temperature = struct.unpack('!H', hk_data[82:84])[0]
coil_z_temperature = struct.unpack('!H', hk_data[84:86])[0]
# FINA step (no coil actuation)
fina_err = hk_data[86]
fina_raw_mag_x = struct.unpack('!f', hk_data[87:91])[0]
fina_raw_mag_y = struct.unpack('!f', hk_data[91:95])[0]
fina_raw_mag_z = struct.unpack('!f', hk_data[95:99])[0]
fina_cal_mag_x = struct.unpack('!f', hk_data[99:103])[0]
fina_cal_mag_y = struct.unpack('!f', hk_data[103:107])[0]
fina_cal_mag_z = struct.unpack('!f', hk_data[107:111])[0]
fina_coil_x_current = struct.unpack('!f', hk_data[111:115])[0]
fina_coil_y_current = struct.unpack('!f', hk_data[115:119])[0]
fina_coil_z_current = struct.unpack('!f', hk_data[119:123])[0]
fina_coil_x_temperature = struct.unpack('!H', hk_data[123:125])[0]
fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0]
hk_content = [init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y,
init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current,
init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x,
init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current,
coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x,
fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current,
fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature,
fina_coil_z_temperature]
return hk_header, hk_content, validity_buffer, len(hk_header)

View File

@ -1,79 +0,0 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import struct
from typing import Tuple
from tmtccmd.pus_tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_logger
from pus_tc.syrlinks_hk_handler import SetIds
from config.object_ids import SYRLINKS_HANDLER
LOGGER = get_logger()
def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
if object_id == SYRLINKS_HANDLER:
if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_rx_registers_dataset(hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["RX Status", "RX Sensitivity", "RX Frequency Shift", "RX IQ Power", "RX AGC Value", "RX Demod Eb",
"RX Demod N0", "RX Datarate"]
rx_status = hk_data[0]
rx_sensitivity = struct.unpack('!I', hk_data[1:5])
rx_frequency_shift = struct.unpack('!I', hk_data[5:9])
rx_iq_power = struct.unpack('!H', hk_data[9:11])
rx_agc_value = struct.unpack('!H', hk_data[11:13])
rx_demod_eb = struct.unpack('!I', hk_data[13:17])
rx_demod_n0 = struct.unpack('!I', hk_data[17:21])
rx_data_rate = hk_data[21]
hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0,
rx_data_rate]
return hk_header, hk_content, validity_buffer, 8
def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack('!H', hk_data[2:4])
hk_content = [tx_status, tx_waveform, tx_agc_value]
return hk_header, hk_content, validity_buffer, 3

View File

@ -1,5 +1,8 @@
import struct
from typing import Tuple
from config.object_ids import PDU_2_HANDLER_ID
from config.object_ids import *
from pus_tc.imtq import ImtqActionIds
from pus_tc.ploc import PlocReplyIds
def user_analyze_service_8_data(
@ -25,7 +28,31 @@ def user_analyze_service_8_data(
data_string = data_string.rstrip(',')
data_string = data_string.rstrip()
content_list = [data_string]
elif object_id == IMTQ_HANDLER_ID:
return handle_imtq_replies(action_id, custom_data)
elif object_id == PLOC_ID:
return handle_ploc_replies(action_id, custom_data)
else:
header_list = []
content_list = []
return header_list, content_list
def handle_imtq_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]:
header_list = []
content_list = []
if action_id == struct.unpack('!I', ImtqActionIds.get_commanded_dipole)[0]:
header_list = ['Commanded X-Dipole', 'Commanded Y-Dipole', 'Commanded Z-Dipole']
x_dipole = struct.unpack('!H', custom_data[:2])
y_dipole = struct.unpack('!H', custom_data[2:4])
z_dipole = struct.unpack('!H', custom_data[4:6])
content_list = [x_dipole[0], y_dipole[0], z_dipole[0]]
def handle_ploc_replies(action_id: int, custom_data: bytearray) -> Tuple[list, list]:
header_list = []
content_list = []
if action_id == PlocReplyIds.tm_mem_read_report:
header_list = ['PLOC Memory Address', 'PLOC Mem Len', 'PLOC Read Memory Data']
content_list = [custom_data[:4], custom_data[4:6], custom_data[6:10]]
return header_list, content_list

View File

@ -26,13 +26,29 @@ limitations under the License.
@author R. Mueller
"""
from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander
import sys
from config.hook_implementations import EiveHookObject
from config.definitions import PUS_APID
from pus_tm.factory_hook import ccsds_tm_handler
try:
from tmtccmd.runner import initialize_tmtc_commander, run_tmtc_commander, add_ccsds_handler
from tmtccmd.ccsds.handler import CcsdsTmHandler
except ImportError as error:
run_tmtc_commander = None
initialize_tmtc_commander = None
print(error)
print("Python tmtccmd submodule could not be imported")
print("Install with \"cd tmtccmd && python3 -m pip install -e .\" for interactive installation")
sys.exit(0)
def main():
hook_obj = EiveHookObject()
initialize_tmtc_commander(hook_object=hook_obj)
ccsds_handler = CcsdsTmHandler()
ccsds_handler.add_tm_handler(apid=PUS_APID, pus_tm_handler=ccsds_tm_handler, max_queue_len=50)
add_ccsds_handler(ccsds_handler)
run_tmtc_commander(False)

@ -1 +1 @@
Subproject commit 3f39a1ffa17e8c74cadc789b3f4714c4bbd5f75f
Subproject commit b4358a15fd945a9e0103a707b2a2dc56c458b24a