v1.9.0 #53

Merged
muellerr merged 354 commits from develop into master 2022-04-07 17:39:42 +02:00
22 changed files with 652 additions and 429 deletions
Showing only changes of commit dff569e93f - Show all commits

9
.gitignore vendored
View File

@ -1,17 +1,10 @@
__pycache__
log
/gps_log.txt
.idea/*
!.idea/runConfigurations
*.json
/Lib
/Scripts
/pyvenv.cfg
/bin
/lib
/lib64
/share
/venv

3
.gitmodules vendored
View File

@ -1,3 +1,4 @@
[submodule "tmtccmd"]
path = tmtccmd
url = https://github.com/rmspacefish/tmtccmd.git
url = https://github.com/robamu-org/tmtccmd.git

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ACU Test UDP" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<configuration default="false" name="ACU Test" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="GPS 0" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s gps0 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="GPS 1" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s gps1 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -1,19 +1,19 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="PDU1 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<configuration default="false" name="PDU1 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Core">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="C:\Users\jakob\AppData\Local\Programs\Python\Python39\python.exe" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="false" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s pdu1 -l -t 6" />
<option name="PARAMETERS" value="-s pdu1 -l -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="PDU2 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<configuration default="false" name="PDU2 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Core">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s pdu2 -l -t 4" />
<option name="PARAMETERS" value="-s pdu2 -l -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Listener Mode" type="PythonConfigurationType" factoryName="Python" folderName="PUS">
<configuration default="false" name="tmtccmd Listener Mode" type="PythonConfigurationType" factoryName="Python">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />

View File

@ -16,6 +16,7 @@ class CustomServiceList(enum.Enum):
PDU1 = "pdu1"
PDU2 = "pdu2"
ACU = "acu"
ACS = "acs"
TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2"
HEATER = "heater"
@ -30,6 +31,8 @@ class CustomServiceList(enum.Enum):
RAD_SENSOR = "rad_sensor"
PLOC_SUPV = "ploc_supv"
PLOC_UPDATER = "ploc_updater"
GPS_0 = "gps0"
GPS_1 = "gps1"
PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
CORE = 'core'
STAR_TRACKER = 'star_tracker'

View File

@ -3,7 +3,6 @@ from typing import Union, Dict, Tuple
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler
@ -20,6 +19,78 @@ class EiveHookObject(TmTcHookBase):
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict()
get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict)
return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""The user can specify a path and filename for the JSON configuration file by overriding
this function.
:return:
"""
return "config/tmtc_config.json"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(), space_packet_id=0x0865
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packer_hook import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)
@staticmethod
def handle_service_5_event(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
if object_id == RW1_ID:
if event_id == 1:
return ""
return ""
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
from pus_tc.pdu1 import Pdu1OpCodes
from pus_tc.pdu2 import Pdu2OpCodes
from pus_tc.gps import GpsOpCodes
op_code_dict = {
'reboot': ('Reboot with Prompt', {OpCodeDictKeys.TIMEOUT: 2.0}),
'reboot_self': ('Reboot Self', {OpCodeDictKeys.TIMEOUT: 4.0}),
@ -31,6 +102,13 @@ class EiveHookObject(TmTcHookBase):
service_tuple = ('Core Controller', op_code_dict)
service_op_code_dict[CustomServiceList.CORE.value] = service_tuple
op_code_dict = {
GpsOpCodes.RESET_GNSS.value: ('Reset GPS', {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_tuple = ('GPS 0', op_code_dict)
service_op_code_dict[CustomServiceList.GPS_0.value] = service_tuple
service_op_code_dict[CustomServiceList.GPS_1.value] = service_tuple
op_code_dict = {
"0": ("ACU Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
@ -54,26 +132,32 @@ class EiveHookObject(TmTcHookBase):
op_code_dict_srv_pdu1 = {
"0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDU1: Turn star tracker on", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.STAR_TRACKER_ON.value:
("PDU1: Turn star tracker on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PDU1: Get switch state of star tracker", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("PDU1: Turn SUS nominal on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("PDU1: Turn star tracker off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.STAR_TRACKER_OFF.value:
("PDU1: Turn star tracker off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("PDU1: Turn SUS nominal off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("PDU1: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("PDU1: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("PDU1: Turn SUS nominal on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("PDU1: Turn SUS nominal off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.ACS_A_SIDE_ON.value:
("PDU1: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.ACS_A_SIDE_OFF.value:
("PDU1: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.PRINT_SWITCH_STATE.value:
("PDU1: Print switch states", {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1)
op_code_dict_srv_pdu2 = {
"0": ("PDU2 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDU1: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PDU1: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("PDU1: Turn SUS redundant on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("PDU1: Turn SUS redundant off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("PDU1: Turn reaction wheels on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("PDU1: Turn reaction wheels off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDU2: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PDU2: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("PDU2: Turn SUS redundant on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("PDU2: Turn SUS redundant off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("PDU2: Turn reaction wheels on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("PDU2: Turn reaction wheels off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.PRINT_SWITCH_STATE.value:
("PDU1: Print switch states", {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2)
@ -156,8 +240,10 @@ class EiveHookObject(TmTcHookBase):
"36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
"37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}),
"38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}),
"39": ("PLOC Supervisor: Factory reset clear mirror entries", {OpCodeDictKeys.TIMEOUT: 2.0}),
"40": ("PLOC Supervisor: Factory reset clear circular entries", {OpCodeDictKeys.TIMEOUT: 2.0}),
"39": ("PLOC Supervisor: Factory reset clear mirror entries",
{OpCodeDictKeys.TIMEOUT: 2.0}),
"40": ("PLOC Supervisor: Factory reset clear circular entries",
{OpCodeDictKeys.TIMEOUT: 2.0}),
"41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
@ -192,75 +278,5 @@ class EiveHookObject(TmTcHookBase):
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
service_op_code_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple
service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = service_ploc_memory_dumper_tuple
return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""
The user can specify a path and filename for the JSON configuration file by overriding this function.
:return:
"""
return "config/tmtc_config.json"
def get_version(self) -> str:
from config.version import SW_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_SUBMINOR
return f"{SW_NAME} {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_SUBMINOR}"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(), space_packet_id=0x0865
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packer_hook import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> PusTelemetry:
from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)
@staticmethod
def handle_service_5_event(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
if object_id == RW1_ID:
if event_id == 1:
return ""
return ""
service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = \
service_ploc_memory_dumper_tuple

View File

@ -5,30 +5,53 @@
"""
from typing import Dict
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE])
# Core Object IDs
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2])
# Power Object IDs
PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1])
P60_DOCK_HANDLER = bytes([0x44, 0x25, 0x00, 0x00])
PDU_1_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x01])
PDU_2_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x02])
ACU_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x03])
# Thermal Object IDs
HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
TMP_1075_1_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x04])
TMP_1075_2_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x05])
HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1])
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2])
# Communication Object IDs
SYRLINKS_HANDLER = bytes([0x44, 0x53, 0x00, 0xA3])
# ACS Object IDs
MGM_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x06])
MGM_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x07])
MGM_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x08])
MGM_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x09])
GYRO_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x10])
GYRO_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11])
GYRO_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12])
GYRO_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13])
GPS_HANDLER_0_ID = bytes([0x44, 0x13, 0x00, 0x45])
GPS_HANDLER_1_ID = bytes([0x44, 0x13, 0x01, 0x46])
RW1_ID = bytes([0x44, 0x12, 0x00, 0x47])
RW2_ID = bytes([0x44, 0x12, 0x01, 0x48])
RW3_ID = bytes([0x44, 0x12, 0x02, 0x49])
RW4_ID = bytes([0x44, 0x12, 0x03, 0x50])
IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14])
PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
RW1_ID = bytes([0x44, 0x12, 0x00, 0x1])
RW2_ID = bytes([0x44, 0x12, 0x00, 0x2])
RW3_ID = bytes([0x44, 0x12, 0x00, 0x3])
RW4_ID = bytes([0x44, 0x12, 0x00, 0x4])
# Misc Object IDs
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE])
# Payload Object IDs
STAR_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5])
PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16])
PLOC_UPDATER_ID = bytes([0x44, 0x33, 0x00, 0x00])
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
PLOC_MEMORY_DUMPER_ID = bytes([0x44, 0x33, 0x00, 0x01])
PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
def get_object_ids() -> Dict[bytes, list]:
@ -48,6 +71,8 @@ def get_object_ids() -> Dict[bytes, list]:
RW2_ID: "Reaction Wheel 2",
RW3_ID: "Reaction Wheel 3",
RW4_ID: "Reaction Wheel 4",
GPS_HANDLER_0_ID: "GPS 0",
GPS_HANDLER_1_ID: "GPS 1",
RAD_SENSOR_ID: "Radiation Sensor",
PLOC_SUPV_ID: "PLOC Supervisor",
CORE_CONTROLLER_ID: "Core Controller",

View File

@ -6,15 +6,20 @@
@author J. Meier
@date 17.12.2020
"""
import enum
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.definitions import PusTelecommand
class GomspaceDeviceActions:
PING = bytearray([0x0, 0x0, 0x0, 0x1])
REBOOT = bytearray([0x0, 0x0, 0x0, 0x4])
PARAM_GET = bytearray([0x0, 0x0, 0x0, 0x00])
PARAM_SET = bytearray([0x0, 0x0, 0x0, 0xFF])
WDT_RESET = bytearray([0x0, 0x0, 0x0, 0x9])
REQUEST_HK_TABLE = bytearray([0x0, 0x0, 0x0, 0x10])
class GomspaceDeviceActionIds(enum.IntEnum):
PING = 1
REBOOT = 4
PARAM_GET = 0
PARAM_SET = 255
WDT_RESET = 9
REQUEST_HK_TABLE = 16
PRINT_SWITCH_STATE = 17
class TableIds:
@ -37,8 +42,9 @@ class Channel:
off = 0
def pack_get_param_command(object_id: bytearray, table_id: int, memory_address: bytearray,
parameter_size: int) -> bytearray:
def pack_get_param_command(
object_id: bytearray, table_id: int, memory_address: bytearray, parameter_size: int
) -> PusTelecommand:
""" Function to generate a command to retrieve parameters like the temperature from a gomspace device.
@param object_id: The object id of the gomspace device handler.
@param table_id: The table id of the gomspace device
@ -46,49 +52,54 @@ def pack_get_param_command(object_id: bytearray, table_id: int, memory_address:
@param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2
@return: The command as bytearray.
"""
action_id = GomspaceDeviceActions.PARAM_GET
command = bytearray()
command = command + object_id + action_id
command.append(table_id)
command = command + memory_address
command.append(parameter_size)
return command
app_data = bytearray()
app_data.append(table_id)
app_data.extend(memory_address)
app_data.append(parameter_size)
return generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PARAM_GET, app_data=app_data
)
def pack_set_param_command(object_id: bytearray, memory_address: bytearray, parameter_size: int,
parameter: int) -> bytearray:
def pack_set_param_command(
object_id: bytearray, memory_address: bytearray, parameter_size: int, parameter: int,
ssc: int = 0
) -> PusTelecommand:
""" Function to generate a command to set a parameter
@param object_id: The object id of the gomspace device handler.
@param memory_address: Address offset within table of the value to set.
@param parameter: The parameter value to set.
@param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t in the device tables.
@return: The command as bytearray.
:param object_id: The object id of the gomspace device handler.
:param memory_address: Address offset within table of the value to set.
:param parameter: The parameter value to set.
:param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t
in the device tables.
:param ssc:
:return: The command as bytearray.
"""
action_id = GomspaceDeviceActions.PARAM_SET
command = bytearray()
command = command + object_id + action_id
command = command + memory_address
command.append(parameter_size)
action_id = GomspaceDeviceActionIds.PARAM_SET
app_data = bytearray()
app_data += memory_address
app_data.append(parameter_size)
if parameter_size == 1:
command.append(parameter)
app_data.append(parameter)
elif parameter_size == 2:
byte_one = 0xFF00 & parameter >> 8
byte_two = 0xFF & parameter
command.append(byte_one)
command.append(byte_two)
app_data.append(byte_one)
app_data.append(byte_two)
elif parameter_size == 4:
byte_one = 0xFF000000 & parameter >> 24
byte_two = 0xFF0000 & parameter >> 16
byte_three = 0xFF00 & parameter >> 8
byte_four = 0xFF & parameter
command.append(byte_one)
command.append(byte_two)
command.append(byte_three)
command.append(byte_four)
return command
app_data.append(byte_one)
app_data.append(byte_two)
app_data.append(byte_three)
app_data.append(byte_four)
return generate_action_command(
object_id=object_id, action_id=action_id, app_data=app_data, ssc=ssc
)
def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray:
def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand:
"""" Function to generate the command to ping a gomspace device
@param object_id Object Id of the gomspace device handler.
@param data Bytearray containing the bytes to send to the gomspace device. For now the on board software
@ -96,37 +107,30 @@ def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray:
@note The ping request sends the specified data to a gompsace device. These
data are simply copied by the device and then sent back.
"""
action_id = GomspaceDeviceActions.PING
command = object_id + action_id + data
return command
return generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PING, app_data=data
)
def pack_gnd_wdt_reset_command(object_id: bytearray) -> bytearray:
def pack_gnd_wdt_reset_command(object_id: bytearray) -> PusTelecommand:
"""" Function to generate the command to reset the watchdog of a gomspace device.
@param object_id Object Id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.WDT_RESET
command = bytearray()
command += object_id + action_id
return command
return generate_action_command(object_id=object_id, action_id=GomspaceDeviceActionIds.WDT_RESET)
def pack_reboot_command(object_id: bytearray) -> bytearray:
def pack_reboot_command(object_id: bytearray) -> PusTelecommand:
""" Function to generate the command which triggers a reboot of a gomspace device
@param object_id The object id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.REBOOT
command = bytearray()
command += object_id + action_id
return command
return generate_action_command(object_id=object_id, action_id=GomspaceDeviceActionIds.REBOOT)
def pack_request_full_hk_table_command(object_id: bytearray) -> bytearray:
def pack_request_full_hk_table_command(object_id: bytearray) -> PusTelecommand:
""" Function to generate the command to request the full housekeeping table from a gomspace
device.
@param object_id The object id of the gomspace device handler.
"""
action_id = GomspaceDeviceActions.REQUEST_HK_TABLE
command = bytearray()
command = object_id + action_id
return command
return generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE
)

25
pus_tc/acs.py Normal file
View File

@ -0,0 +1,25 @@
import enum
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.pus.service_list import PusServices
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.packer import PusTelecommand
from config.object_ids import MGM_0_HANDLER_ID, MGM_1_HANDLER_ID, MGM_2_HANDLER_ID, MGM_3_HANDLER_ID
class AcsOpCodes(enum.Enum):
# Switch on A side
ON_MGM_0 = "0"
ON_MGM_1 = "1"
ON_GYRO_0 = "2"
ON_GYRO_1 = "3"
# Switch on B side
ON_MGM_2 = "4"
ON_MGM_3 = "5"
ON_GYRO_2 = "6"
ON_GYRO_3 = "7"
def pack_acs_command(tc_queue: TcQueueT, op_code: str):
if op_code == AcsOpCodes.ON_MGM_0.value:
app_data = pack_mode_data(object_id=MGM_0_HANDLER_ID, mode=Modes.ON, submode=0)
# return PusTelecommand(service=PusServices.SERVICE_200_MODE, subservice=)

View File

@ -5,9 +5,7 @@
@author J. Meier
@date 21.12.2020
"""
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from gomspace.gomspace_common import *
from pus_tc.p60dock import P60DockConfigTable
@ -64,13 +62,13 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT):
p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.on
)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot"))
command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ground watchdog timer value"))
@ -78,54 +76,54 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT):
object_id, TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address,
ACUHkTable.wdt_gnd_left.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_temperature3:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3"))
command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.temperature3.parameter_address,
ACUHkTable.temperature3.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vboost:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vboost.parameter_address,
ACUConfigTable.vboost.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address,
ACUConfigTable.vbat_max_hi.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address,
ACUConfigTable.vbat_max_lo.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.ov_mode.parameter_address,
ACUConfigTable.ov_mode.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU"))
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

22
pus_tc/gps.py Normal file
View File

@ -0,0 +1,22 @@
import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from config.object_ids import GPS_HANDLER_1_ID, GPS_HANDLER_0_ID
class GpsOpCodes(enum.Enum):
RESET_GNSS = "5"
def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str):
if op_code == GpsOpCodes.RESET_GNSS.value:
if object_id == GPS_HANDLER_0_ID:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0"))
elif object_id == GPS_HANDLER_1_ID:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1"))
cmd = generate_action_command(object_id=object_id, action_id=int(op_code))
tc_queue.appendleft(cmd.pack_command_tuple())

View File

@ -61,25 +61,27 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on"))
parameter = 0 # set channel off
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_9.parameter_address,
P60DockConfigTable.out_en_9.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
command = pack_set_param_command(
object_id, P60DockConfigTable.out_en_9.parameter_address,
P60DockConfigTable.out_en_9.parameter_size, Channel.on
)
# command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off"))
parameter = 0 # set channel off
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_9.parameter_address,
P60DockConfigTable.out_en_9.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
command = pack_set_param_command(
object_id, P60DockConfigTable.out_en_9.parameter_address,
P60DockConfigTable.out_en_9.parameter_size, Channel.off
)
# command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
if P60DockTestProcedure.all or P60DockTestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reboot"))
command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value"))
@ -87,25 +89,25 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address,
P60DockHkTable.wdt_gnd_left.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Ping"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 off"))
parameter = 0 # set channel off
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_temperature1:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing temperature reading"))
@ -113,14 +115,14 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address,
P60DockHkTable.temperature1.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing Output Channel 3 state (PDU2)"))
command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT,
@ -129,14 +131,14 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address,
P60DockConfigTable.cur_lu_lim_0.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 on"))
parameter = 1 # set channel on
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_table_id_test:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing invalid table id handling"))
@ -145,7 +147,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, table_id_invalid, P60DockHkTable.temperature1.parameter_address,
P60DockHkTable.temperature1.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_address_test:
tc_queue.appendleft((QueueCommands.PRINT,
@ -153,7 +155,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
invalid_address = bytearray([0x01, 0xF4])
command = pack_get_param_command(object_id, TableIds.hk, invalid_address,
P60DockHkTable.temperature1.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing invalid address handling in set param command"))
@ -161,7 +163,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
parameter_size = 2
parameter = 1
command = pack_set_param_command(object_id, invalid_address, parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_parameter_size_test:
tc_queue.appendleft(
@ -172,7 +174,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
command = pack_get_param_command(
object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size
)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(
(QueueCommands.PRINT,
@ -182,7 +184,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
command = pack_set_param_command(
object_id, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter
)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
# command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

View File

@ -6,6 +6,7 @@
@author J. Meier
@date 17.12.2020
"""
import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
@ -15,6 +16,14 @@ from pus_tc.p60dock import P60DockConfigTable
from gomspace.gomspace_pdu_definitions import *
class Pdu1OpCodes(enum.Enum):
STAR_TRACKER_ON = "1"
STAR_TRACKER_OFF = "4"
ACS_A_SIDE_ON = "6"
ACS_A_SIDE_OFF = "7"
PRINT_SWITCH_STATE = "17"
class PDU1TestProcedure:
"""
@brief Use this class to define the tests to perform for the PDU2.
@ -31,44 +40,48 @@ class PDU1TestProcedure:
turn_channel_3_off = False
def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft((QueueCommands.PRINT, "Commanding PDU1"))
if op_code == "1":
if op_code == Pdu1OpCodes.STAR_TRACKER_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address,
PDUConfigTable.out_en_4.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
if op_code == Pdu1OpCodes.STAR_TRACKER_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address,
PDUConfigTable.out_en_4.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
if op_code == Pdu1OpCodes.ACS_A_SIDE_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "7":
if op_code == Pdu1OpCodes.ACS_A_SIDE_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu1OpCodes.PRINT_SWITCH_STATE.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Switch Status"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_STATE
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on"))
@ -87,7 +100,6 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str)
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading"))
@ -95,29 +107,32 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str)
object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size
)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_3.parameter_address,
PDUConfigTable.out_en_3.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -8,10 +8,14 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import *
from pus_tc.p60dock import P60DockConfigTable
class Pdu2OpCodes(enum.Enum):
ACS_SIDE_B_ON = "1"
ACS_SIDE_B_OFF = "2"
PRINT_SWITCH_STATE = "17"
class PDU2TestProcedure:
@ -26,7 +30,7 @@ class PDU2TestProcedure:
gnd_wdt_reset = False
ping = False
channel_2_off = False # Reaction wheels 5V
read_temperature = True
read_temperature = False
read_channel_2_state = False # Reaction wheels 5V
read_cur_lu_lim_0 = False # OBC
channel_2_on = False # Reaction wheels 5V
@ -39,18 +43,20 @@ class PDU2TestProcedure:
def pack_pdu2_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2"))
if op_code == "1":
if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
if op_code == "2":
if op_code == Pdu2OpCodes.ACS_SIDE_B_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
command = pack_set_param_command(
object_id, PDUConfigTable.out_en_7.parameter_address,
PDUConfigTable.out_en_7.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
if op_code == "3":
@ -76,64 +82,73 @@ def pack_pdu2_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str)
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command)
if op_code == Pdu2OpCodes.PRINT_SWITCH_STATE.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Switch Status"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_STATE
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reboot"))
command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value"))
command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address,
PDUHkTable.wdt_gnd_left.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
command = pack_get_param_command(
object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address,
PDUHkTable.wdt_gnd_left.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)"))
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)")
)
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading"))
command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
command = pack_get_param_command(
object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_channel_2_state:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)"))
command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)")
)
command = pack_get_param_command(
object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)"))
command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address,
PDUConfigTable.cur_lu_lim_0.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)")
)
command = pack_get_param_command(
object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address,
PDUConfigTable.cur_lu_lim_0.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.request_hk_table:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting housekeeping table"))
command = pack_request_full_hk_table_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue

View File

@ -1,9 +1,5 @@
"""Hook function which packs telecommands based on service and operation code string
"""
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os
from collections import deque
from typing import Union
@ -30,10 +26,13 @@ from pus_tc.ploc_upater import pack_ploc_updater_test_into
from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
from pus_tc.core import pack_core_commands
from pus_tc.star_tracker import pack_star_tracker_commands_into
from pus_tc.gps import pack_gps_command
from pus_tc.acs import pack_acs_command
from config.definitions import CustomServiceList
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \
RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, \
ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, \
PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, \
STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID
LOGGER = get_console_logger()
@ -87,21 +86,37 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.RAD_SENSOR.value:
object_id = RAD_SENSOR_ID
return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_rad_sensor_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.PLOC_SUPV.value:
object_id = PLOC_SUPV_ID
return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_ploc_supv_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.PLOC_UPDATER.value:
object_id = PLOC_UPDATER_ID
return pack_ploc_updater_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_ploc_updater_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.STAR_TRACKER.value:
object_id = STAR_TRACKER_ID
return pack_star_tracker_commands_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_star_tracker_commands_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.CORE.value:
return pack_core_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PLOC_MEMORY_DUMPER.value:
object_id = PLOC_MEMORY_DUMPER_ID
return pack_ploc_memory_dumper_cmd(object_id=object_id, tc_queue=service_queue, op_code=op_code)
return pack_ploc_memory_dumper_cmd(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.ACS.value:
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.GPS_0.value:
return pack_gps_command(object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.GPS_1.value:
return pack_gps_command(object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !")

View File

@ -27,16 +27,16 @@ def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
service_type = raw_tm_packet[7]
tm_packet = None
if service_type == 1:
tm_packet = Service1TM(raw_tm_packet)
tm_packet = Service1TM.unpack(raw_telemetry=raw_tm_packet)
if service_type == 3:
tm_packet = Service3TM(raw_tm_packet)
tm_packet = Service3TM.unpack(raw_telemetry=raw_tm_packet, custom_hk_handling=False)
if service_type == 5:
tm_packet = Service5TM(raw_tm_packet)
tm_packet = Service5TM.unpack(raw_telemetry=raw_tm_packet)
if service_type == 8:
tm_packet = Service8TM(raw_tm_packet)
tm_packet = Service8TM.unpack(raw_telemetry=raw_tm_packet)
if service_type == 17:
tm_packet = Service17TM(raw_tm_packet)
tm_packet = Service17TM.unpack(raw_telemetry=raw_tm_packet)
if tm_packet is None:
LOGGER.info(f'The service {service_type} is not implemented in Telemetry Factory')
tm_packet = PusTelemetry(raw_tm_packet)
tmtc_printer.print_telemetry(packet=tm_packet)
tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet)
tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet, print_raw_tm=False)

View File

@ -1,40 +1,22 @@
"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
"""HK Handling for EIVE OBSW"""
import struct
import os
import datetime
from typing import Tuple
from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger
from pus_tc.syrlinks_hk_handler import SetIds
from pus_tc.imtq import ImtqSetIds
from config.object_ids import SYRLINKS_HANDLER, IMTQ_HANDLER_ID
from config.object_ids import SYRLINKS_HANDLER, IMTQ_HANDLER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID
LOGGER = get_console_logger()
def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
def handle_user_hk_packet(
object_id: bytes, set_id: int, hk_data: bytearray,
service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
"""This function is called when a Service 3 Housekeeping packet is received.
"""
if object_id == SYRLINKS_HANDLER:
if set_id == SetIds.RX_REGISTERS_DATASET:
@ -50,6 +32,8 @@ def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray,
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
return handle_gps_data(hk_data=hk_data)
else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0
@ -90,16 +74,21 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]",
"Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]",
"Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]",
hk_header = [
"Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]",
"Init Cal Mag X [nT]", "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]",
"Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Coil Z Current [mA]",
"Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]",
"Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]",
"Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]",
"Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]",
"Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]",
"Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]"]
"Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]",
"Coil Y Current [mA]", "Coil Z Current [mA]",
"Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]",
"Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]",
"Fina Cal Mag X [nT]", "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]",
"Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Coil Z Current [mA]",
"Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]"
]
# INIT step (no coil actuation)
init_err = hk_data[0]
init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0]
@ -145,13 +134,65 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0]
hk_content = [init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y,
hk_content = [
init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y,
init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current,
init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x,
init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current,
coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x,
fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current,
fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature,
fina_coil_z_temperature]
init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err,
raw_mag_x, init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z,
coil_x_current, coil_y_current, coil_z_current,
coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err,
fina_raw_mag_x, fina_raw_mag_y, fina_raw_mag_z,
fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z,
fina_coil_x_current, fina_coil_y_current, fina_coil_z_current,
fina_coil_x_temperature, fina_coil_y_temperature, fina_coil_z_temperature
]
return hk_header, hk_content, validity_buffer, len(hk_header)
def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
LOGGER.info(f'Received GPS data, HK data length {len(hk_data)}')
var_index = 0
header_array = []
content_array = []
latitude = struct.unpack('!d', hk_data[0:8])[0]
header_array.append('Latitude')
content_array.append(latitude)
longitude = struct.unpack('!d', hk_data[8:16])[0]
header_array.append('Longitude')
content_array.append(longitude)
altitude = struct.unpack('!d', hk_data[16:24])[0]
header_array.append('Altitude')
content_array.append(altitude)
fix_mode = hk_data[24]
header_array.append('Fix Mode')
content_array.append(fix_mode)
sat_in_use = hk_data[25]
header_array.append('Sats in Use')
content_array.append(sat_in_use)
year = struct.unpack('!H', hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
header_array.append('Date')
date_string = f'{day}.{month}.{year} {hours}:{minutes}:{seconds}'
content_array.append(date_string)
unix_seconds = struct.unpack('!I', hk_data[33:37])[0]
header_array.append('Unix Seconds')
content_array.append(unix_seconds)
var_index += 13
if not os.path.isfile('gps_log.txt'):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
'Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, '
'Date, Unix Seconds\n'
)
with open("gps_log.txt", "a") as gps_file:
gps_file.write(
f'{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, '
f'{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n'
)
validity_buffer = hk_data[37:39]
return header_array, content_array, validity_buffer, var_index

@ -1 +1 @@
Subproject commit 94978512deb6c91a6a1456ae3a1182bf679f063a
Subproject commit 38be11cab16c036fb0968f01d46d198be4cd058f