Update tmtccmd #21

Merged
meierj merged 32 commits from mueller/master into develop 2021-09-15 14:59:39 +02:00
22 changed files with 649 additions and 425 deletions

9
.gitignore vendored
View File

@ -1,17 +1,10 @@
__pycache__ __pycache__
log log
/gps_log.txt
.idea/* .idea/*
!.idea/runConfigurations !.idea/runConfigurations
*.json *.json
/Lib
/Scripts
/pyvenv.cfg
/bin
/lib
/lib64
/share
/venv /venv

3
.gitmodules vendored
View File

@ -1,3 +1,4 @@
[submodule "tmtccmd"] [submodule "tmtccmd"]
path = tmtccmd path = tmtccmd
url = https://github.com/rmspacefish/tmtccmd.git url = https://github.com/robamu-org/tmtccmd.git

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="ACU Test UDP" type="PythonConfigurationType" factoryName="Python" folderName="Devices"> <configuration default="false" name="ACU Test" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="GPS 0" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s gps0 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="GPS 1" type="PythonConfigurationType" factoryName="Python" folderName="Devices">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s gps1 -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -1,19 +1,19 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="PDU1 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Devices"> <configuration default="false" name="PDU1 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Core">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs> <envs>
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="C:\Users\jakob\AppData\Local\Programs\Python\Python39\python.exe" /> <option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s pdu1 -l -t 6" /> <option name="PARAMETERS" value="-s pdu1 -l -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="PDU2 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Devices"> <configuration default="false" name="PDU2 Commanding" type="PythonConfigurationType" factoryName="Python" folderName="Core">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-s pdu2 -l -t 4" /> <option name="PARAMETERS" value="-s pdu2 -l -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />

View File

@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcc Listener Mode" type="PythonConfigurationType" factoryName="Python" folderName="PUS"> <configuration default="false" name="tmtccmd Listener Mode" type="PythonConfigurationType" factoryName="Python">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />

View File

@ -16,6 +16,7 @@ class CustomServiceList(enum.Enum):
PDU1 = "pdu1" PDU1 = "pdu1"
PDU2 = "pdu2" PDU2 = "pdu2"
ACU = "acu" ACU = "acu"
ACS = "acs"
TMP1075_1 = "tmp1075_1" TMP1075_1 = "tmp1075_1"
TMP1075_2 = "tmp1075_2" TMP1075_2 = "tmp1075_2"
HEATER = "heater" HEATER = "heater"
@ -30,6 +31,8 @@ class CustomServiceList(enum.Enum):
RAD_SENSOR = "rad_sensor" RAD_SENSOR = "rad_sensor"
PLOC_SUPV = "ploc_supv" PLOC_SUPV = "ploc_supv"
PLOC_UPDATER = "ploc_updater" PLOC_UPDATER = "ploc_updater"
GPS_0 = "gps0"
GPS_1 = "gps1"
PLOC_MEMORY_DUMPER = "ploc_memory_dumper" PLOC_MEMORY_DUMPER = "ploc_memory_dumper"
CORE = 'core' CORE = 'core'
STAR_TRACKER = 'star_tracker' STAR_TRACKER = 'star_tracker'

View File

@ -3,7 +3,6 @@ from typing import Union, Dict, Tuple
from tmtccmd.config.definitions import ServiceOpCodeDictT from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.tm.service_3_base import Service3Base
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.tc.definitions import TcQueueT from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.core.backend import TmTcHandler from tmtccmd.core.backend import TmTcHandler
@ -20,6 +19,76 @@ class EiveHookObject(TmTcHookBase):
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT: def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
from tmtccmd.config.globals import get_default_service_op_code_dict from tmtccmd.config.globals import get_default_service_op_code_dict
service_op_code_dict = get_default_service_op_code_dict() service_op_code_dict = get_default_service_op_code_dict()
get_eive_service_op_code_dict(service_op_code_dict=service_op_code_dict)
return service_op_code_dict
def get_json_config_file_path(self) -> str:
"""The user can specify a path and filename for the JSON configuration file by overriding
this function.
:return:
"""
return "config/tmtc_config.json"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(), space_packet_id=0x0865
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packer_hook import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)
@staticmethod
def handle_service_5_event(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
if object_id == RW1_ID:
if event_id == 1:
return ""
return ""
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
from pus_tc.pdu1 import Pdu1OpCodes
from pus_tc.pdu2 import Pdu2OpCodes
from pus_tc.gps import GpsOpCodes
op_code_dict = { op_code_dict = {
'reboot': ('Reboot with Prompt', {OpCodeDictKeys.TIMEOUT: 2.0}), 'reboot': ('Reboot with Prompt', {OpCodeDictKeys.TIMEOUT: 2.0}),
'reboot_self': ('Reboot Self', {OpCodeDictKeys.TIMEOUT: 4.0}), 'reboot_self': ('Reboot Self', {OpCodeDictKeys.TIMEOUT: 4.0}),
@ -31,6 +100,13 @@ class EiveHookObject(TmTcHookBase):
service_tuple = ('Core Controller', op_code_dict) service_tuple = ('Core Controller', op_code_dict)
service_op_code_dict[CustomServiceList.CORE.value] = service_tuple service_op_code_dict[CustomServiceList.CORE.value] = service_tuple
op_code_dict = {
GpsOpCodes.RESET_GNSS.value: ('Reset GPS', {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_tuple = ('GPS 0', op_code_dict)
service_op_code_dict[CustomServiceList.GPS_0.value] = service_tuple
service_op_code_dict[CustomServiceList.GPS_1.value] = service_tuple
op_code_dict = { op_code_dict = {
"0": ("ACU Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), "0": ("ACU Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
} }
@ -54,13 +130,19 @@ class EiveHookObject(TmTcHookBase):
op_code_dict_srv_pdu1 = { op_code_dict_srv_pdu1 = {
"0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), "0": ("PDU1 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDU1: Turn star tracker on", {OpCodeDictKeys.TIMEOUT: 2.0}), Pdu1OpCodes.STAR_TRACKER_ON.value:
("PDU1: Turn star tracker on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PDU1: Get switch state of star tracker", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("PDU1: Get switch state of star tracker", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("PDU1: Turn SUS nominal on", {OpCodeDictKeys.TIMEOUT: 2.0}), "3": ("PDU1: Turn SUS nominal on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("PDU1: Turn star tracker off", {OpCodeDictKeys.TIMEOUT: 2.0}), Pdu1OpCodes.STAR_TRACKER_OFF.value:
("PDU1: Turn star tracker off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("PDU1: Turn SUS nominal off", {OpCodeDictKeys.TIMEOUT: 2.0}), "5": ("PDU1: Turn SUS nominal off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("PDU1: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}), Pdu1OpCodes.ACS_A_SIDE_ON.value:
"7": ("PDU1: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}), ("PDU1: Turn ACS Side A on", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.ACS_A_SIDE_OFF.value:
("PDU1: Turn ACS Side A off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu1OpCodes.PRINT_SWITCH_STATE.value:
("PDU1: Print switch states", {OpCodeDictKeys.TIMEOUT: 2.0})
} }
service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1) service_pdu1_tuple = ("PDU1 Device", op_code_dict_srv_pdu1)
@ -68,6 +150,8 @@ class EiveHookObject(TmTcHookBase):
"0": ("PDU2 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}), "0": ("PDU2 Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDU1: Turn ACS Side B on", {OpCodeDictKeys.TIMEOUT: 2.0}), "1": ("PDU1: Turn ACS Side B on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PDU1: Turn ACS Side B off", {OpCodeDictKeys.TIMEOUT: 2.0}), "2": ("PDU1: Turn ACS Side B off", {OpCodeDictKeys.TIMEOUT: 2.0}),
Pdu2OpCodes.PRINT_SWITCH_STATE.value:
("PDU2: Print switch states", {OpCodeDictKeys.TIMEOUT: 2.0})
} }
service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2) service_pdu2_tuple = ("PDU2 Device", op_code_dict_srv_pdu2)
@ -150,8 +234,10 @@ class EiveHookObject(TmTcHookBase):
"36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}), "36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
"37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}), "37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}),
"38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}), "38": ("PLOC Supervisor: Factory reset clear all", {OpCodeDictKeys.TIMEOUT: 2.0}),
"39": ("PLOC Supervisor: Factory reset clear mirror entries", {OpCodeDictKeys.TIMEOUT: 2.0}), "39": ("PLOC Supervisor: Factory reset clear mirror entries",
"40": ("PLOC Supervisor: Factory reset clear circular entries", {OpCodeDictKeys.TIMEOUT: 2.0}), {OpCodeDictKeys.TIMEOUT: 2.0}),
"40": ("PLOC Supervisor: Factory reset clear circular entries",
{OpCodeDictKeys.TIMEOUT: 2.0}),
"41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}), "41": ("PLOC Supervisor: CAN loopback test", {OpCodeDictKeys.TIMEOUT: 2.0}),
} }
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv) service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
@ -186,75 +272,5 @@ class EiveHookObject(TmTcHookBase):
service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple service_op_code_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple service_op_code_dict[CustomServiceList.PLOC_UPDATER.value] = service_ploc_updater_tuple
service_op_code_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple service_op_code_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple
service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = service_ploc_memory_dumper_tuple service_op_code_dict[CustomServiceList.PLOC_MEMORY_DUMPER.value] = \
return service_op_code_dict service_ploc_memory_dumper_tuple
def get_json_config_file_path(self) -> str:
"""
The user can specify a path and filename for the JSON configuration file by overriding this function.
:return:
"""
return "config/tmtc_config.json"
def get_version(self) -> str:
from config.version import SW_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_SUBMINOR
return f"{SW_NAME} {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_SUBMINOR}"
def add_globals_pre_args_parsing(self, gui: bool = False):
from config.globals_config import set_globals_pre_args_parsing
set_globals_pre_args_parsing(gui=gui)
def add_globals_post_args_parsing(self, args: argparse.Namespace):
from config.globals_config import add_globals_post_args_parsing
add_globals_post_args_parsing(args=args, json_cfg_path=self.get_json_config_file_path())
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.config.com_if import create_communication_interface_default
return create_communication_interface_default(
com_if_key=com_if_key, tmtc_printer=tmtc_printer,
json_cfg_path=self.get_json_config_file_path(), space_packet_id=0x0865
)
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from pus_tc.tc_packer_hook import pack_service_queue_user
pack_service_queue_user(service=service, op_code=op_code, service_queue=service_queue)
def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> PusTelemetry:
from pus_tm.factory_hook import tm_user_factory_hook
return tm_user_factory_hook(raw_tm_packet=raw_tm_packet)
def get_object_ids(self) -> Dict[bytes, list]:
from config.object_ids import get_object_ids
return get_object_ids()
@staticmethod
def handle_service_8_telemetry(
object_id: bytes, action_id: int, custom_data: bytearray
) -> Tuple[list, list]:
from pus_tm.service_8_hook import user_analyze_service_8_data
return user_analyze_service_8_data(
object_id=object_id, action_id=action_id, custom_data=custom_data
)
@staticmethod
def handle_service_3_housekeeping(
object_id: bytes, set_id: int, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
from pus_tm.hk_handling import handle_user_hk_packet
return handle_user_hk_packet(
object_id=object_id, set_id=set_id, hk_data=hk_data, service3_packet=service3_packet
)
@staticmethod
def handle_service_5_event(
object_id: bytes, event_id: int, param_1: int, param_2: int
) -> str:
if object_id == RW1_ID:
if event_id == 1:
return ""
return ""

View File

@ -5,30 +5,53 @@
""" """
from typing import Dict from typing import Dict
PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17]) # Core Object IDs
TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE]) CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2])
# Power Object IDs
PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1])
P60_DOCK_HANDLER = bytes([0x44, 0x25, 0x00, 0x00]) P60_DOCK_HANDLER = bytes([0x44, 0x25, 0x00, 0x00])
PDU_1_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x01]) PDU_1_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x01])
PDU_2_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x02]) PDU_2_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x02])
ACU_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x03]) ACU_HANDLER_ID = bytes([0x44, 0x25, 0x00, 0x03])
# Thermal Object IDs
HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
TMP_1075_1_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x04]) TMP_1075_1_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x04])
TMP_1075_2_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x05]) TMP_1075_2_HANDLER_ID = bytes([0x44, 0x42, 0x00, 0x05])
HEATER_ID = bytes([0x44, 0x41, 0x00, 0xA4])
PCDU_HANDLER_ID = bytes([0x44, 0x20, 0x00, 0xA1]) # Communication Object IDs
SOLAR_ARRAY_DEPLOYMENT_ID = bytes([0x44, 0x41, 0x10, 0xA2])
SYRLINKS_HANDLER = bytes([0x44, 0x53, 0x00, 0xA3]) SYRLINKS_HANDLER = bytes([0x44, 0x53, 0x00, 0xA3])
# ACS Object IDs
MGM_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x06])
MGM_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x07])
MGM_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x08])
MGM_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x09])
GYRO_0_HANDLER_ID = bytes([0x44, 0x12, 0x00, 0x10])
GYRO_1_HANDLER_ID = bytes([0x44, 0x12, 0x01, 0x11])
GYRO_2_HANDLER_ID = bytes([0x44, 0x12, 0x02, 0x12])
GYRO_3_HANDLER_ID = bytes([0x44, 0x12, 0x03, 0x13])
GPS_HANDLER_0_ID = bytes([0x44, 0x13, 0x00, 0x45])
GPS_HANDLER_1_ID = bytes([0x44, 0x13, 0x01, 0x46])
RW1_ID = bytes([0x44, 0x12, 0x00, 0x47])
RW2_ID = bytes([0x44, 0x12, 0x01, 0x48])
RW3_ID = bytes([0x44, 0x12, 0x02, 0x49])
RW4_ID = bytes([0x44, 0x12, 0x03, 0x50])
IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14]) IMTQ_HANDLER_ID = bytes([0x44, 0x14, 0x00, 0x14])
PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
RW1_ID = bytes([0x44, 0x12, 0x00, 0x1]) # Misc Object IDs
RW2_ID = bytes([0x44, 0x12, 0x00, 0x2]) PUS_SERVICE_17_ID = bytes([0x53, 0x00, 0x00, 0x17])
RW3_ID = bytes([0x44, 0x12, 0x00, 0x3]) TEST_DEVICE_ID = bytes([0x54, 0x00, 0xAF, 0xFE])
RW4_ID = bytes([0x44, 0x12, 0x00, 0x4])
# Payload Object IDs
STAR_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1]) STAR_TRACKER_ID = bytes([0x44, 0x13, 0x00, 0x1])
RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5]) RAD_SENSOR_ID = bytes([0x44, 0x32, 0x00, 0xA5])
PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16]) PLOC_SUPV_ID = bytes([0x44, 0x33, 0x00, 0x16])
PLOC_UPDATER_ID = bytes([0x44, 0x33, 0x00, 0x00]) PLOC_UPDATER_ID = bytes([0x44, 0x33, 0x00, 0x00])
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])
PLOC_MEMORY_DUMPER_ID = bytes([0x44, 0x33, 0x00, 0x01]) PLOC_MEMORY_DUMPER_ID = bytes([0x44, 0x33, 0x00, 0x01])
PLOC_MPSOC_ID = bytes([0x44, 0x33, 0x00, 0x15])
def get_object_ids() -> Dict[bytes, list]: def get_object_ids() -> Dict[bytes, list]:
@ -48,6 +71,8 @@ def get_object_ids() -> Dict[bytes, list]:
RW2_ID: "Reaction Wheel 2", RW2_ID: "Reaction Wheel 2",
RW3_ID: "Reaction Wheel 3", RW3_ID: "Reaction Wheel 3",
RW4_ID: "Reaction Wheel 4", RW4_ID: "Reaction Wheel 4",
GPS_HANDLER_0_ID: "GPS 0",
GPS_HANDLER_1_ID: "GPS 1",
RAD_SENSOR_ID: "Radiation Sensor", RAD_SENSOR_ID: "Radiation Sensor",
PLOC_SUPV_ID: "PLOC Supervisor", PLOC_SUPV_ID: "PLOC Supervisor",
CORE_CONTROLLER_ID: "Core Controller", CORE_CONTROLLER_ID: "Core Controller",

View File

@ -6,15 +6,20 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
import enum
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.definitions import PusTelecommand
class GomspaceDeviceActions: class GomspaceDeviceActionIds(enum.IntEnum):
PING = bytearray([0x0, 0x0, 0x0, 0x1]) PING = 1
REBOOT = bytearray([0x0, 0x0, 0x0, 0x4]) REBOOT = 4
PARAM_GET = bytearray([0x0, 0x0, 0x0, 0x00]) PARAM_GET = 0
PARAM_SET = bytearray([0x0, 0x0, 0x0, 0xFF]) PARAM_SET = 255
WDT_RESET = bytearray([0x0, 0x0, 0x0, 0x9]) WDT_RESET = 9
REQUEST_HK_TABLE = bytearray([0x0, 0x0, 0x0, 0x10]) REQUEST_HK_TABLE = 16
PRINT_SWITCH_STATE = 17
class TableIds: class TableIds:
@ -37,8 +42,9 @@ class Channel:
off = 0 off = 0
def pack_get_param_command(object_id: bytearray, table_id: int, memory_address: bytearray, def pack_get_param_command(
parameter_size: int) -> bytearray: object_id: bytearray, table_id: int, memory_address: bytearray, parameter_size: int
) -> PusTelecommand:
""" Function to generate a command to retrieve parameters like the temperature from a gomspace device. """ Function to generate a command to retrieve parameters like the temperature from a gomspace device.
@param object_id: The object id of the gomspace device handler. @param object_id: The object id of the gomspace device handler.
@param table_id: The table id of the gomspace device @param table_id: The table id of the gomspace device
@ -46,49 +52,54 @@ def pack_get_param_command(object_id: bytearray, table_id: int, memory_address:
@param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2 @param parameter_size: Size of the value to read. E.g. temperature is uint16_t and thus parameter_size is 2
@return: The command as bytearray. @return: The command as bytearray.
""" """
action_id = GomspaceDeviceActions.PARAM_GET app_data = bytearray()
command = bytearray() app_data.append(table_id)
command = command + object_id + action_id app_data.extend(memory_address)
command.append(table_id) app_data.append(parameter_size)
command = command + memory_address return generate_action_command(
command.append(parameter_size) object_id=object_id, action_id=GomspaceDeviceActionIds.PARAM_GET, app_data=app_data
return command )
def pack_set_param_command(object_id: bytearray, memory_address: bytearray, parameter_size: int, def pack_set_param_command(
parameter: int) -> bytearray: object_id: bytearray, memory_address: bytearray, parameter_size: int, parameter: int,
ssc: int = 0
) -> PusTelecommand:
""" Function to generate a command to set a parameter """ Function to generate a command to set a parameter
@param object_id: The object id of the gomspace device handler. :param object_id: The object id of the gomspace device handler.
@param memory_address: Address offset within table of the value to set. :param memory_address: Address offset within table of the value to set.
@param parameter: The parameter value to set. :param parameter: The parameter value to set.
@param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t in the device tables. :param parameter_size: Size of the value to set. There are uint8_t, uint16_t and uint32_t
@return: The command as bytearray. in the device tables.
:param ssc:
:return: The command as bytearray.
""" """
action_id = GomspaceDeviceActions.PARAM_SET action_id = GomspaceDeviceActionIds.PARAM_SET
command = bytearray() app_data = bytearray()
command = command + object_id + action_id app_data += memory_address
command = command + memory_address app_data.append(parameter_size)
command.append(parameter_size)
if parameter_size == 1: if parameter_size == 1:
command.append(parameter) app_data.append(parameter)
elif parameter_size == 2: elif parameter_size == 2:
byte_one = 0xFF00 & parameter >> 8 byte_one = 0xFF00 & parameter >> 8
byte_two = 0xFF & parameter byte_two = 0xFF & parameter
command.append(byte_one) app_data.append(byte_one)
command.append(byte_two) app_data.append(byte_two)
elif parameter_size == 4: elif parameter_size == 4:
byte_one = 0xFF000000 & parameter >> 24 byte_one = 0xFF000000 & parameter >> 24
byte_two = 0xFF0000 & parameter >> 16 byte_two = 0xFF0000 & parameter >> 16
byte_three = 0xFF00 & parameter >> 8 byte_three = 0xFF00 & parameter >> 8
byte_four = 0xFF & parameter byte_four = 0xFF & parameter
command.append(byte_one) app_data.append(byte_one)
command.append(byte_two) app_data.append(byte_two)
command.append(byte_three) app_data.append(byte_three)
command.append(byte_four) app_data.append(byte_four)
return command return generate_action_command(
object_id=object_id, action_id=action_id, app_data=app_data, ssc=ssc
)
def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray: def pack_ping_command(object_id: bytearray, data: bytearray) -> PusTelecommand:
"""" Function to generate the command to ping a gomspace device """" Function to generate the command to ping a gomspace device
@param object_id Object Id of the gomspace device handler. @param object_id Object Id of the gomspace device handler.
@param data Bytearray containing the bytes to send to the gomspace device. For now the on board software @param data Bytearray containing the bytes to send to the gomspace device. For now the on board software
@ -96,37 +107,30 @@ def pack_ping_command(object_id: bytearray, data: bytearray) -> bytearray:
@note The ping request sends the specified data to a gompsace device. These @note The ping request sends the specified data to a gompsace device. These
data are simply copied by the device and then sent back. data are simply copied by the device and then sent back.
""" """
action_id = GomspaceDeviceActions.PING return generate_action_command(
command = object_id + action_id + data object_id=object_id, action_id=GomspaceDeviceActionIds.PING, app_data=data
return command )
def pack_gnd_wdt_reset_command(object_id: bytearray) -> bytearray: def pack_gnd_wdt_reset_command(object_id: bytearray) -> PusTelecommand:
"""" Function to generate the command to reset the watchdog of a gomspace device. """" Function to generate the command to reset the watchdog of a gomspace device.
@param object_id Object Id of the gomspace device handler. @param object_id Object Id of the gomspace device handler.
""" """
action_id = GomspaceDeviceActions.WDT_RESET return generate_action_command(object_id=object_id, action_id=GomspaceDeviceActionIds.WDT_RESET)
command = bytearray()
command += object_id + action_id
return command
def pack_reboot_command(object_id: bytearray) -> bytearray: def pack_reboot_command(object_id: bytearray) -> PusTelecommand:
""" Function to generate the command which triggers a reboot of a gomspace device """ Function to generate the command which triggers a reboot of a gomspace device
@param object_id The object id of the gomspace device handler. @param object_id The object id of the gomspace device handler.
""" """
action_id = GomspaceDeviceActions.REBOOT return generate_action_command(object_id=object_id, action_id=GomspaceDeviceActionIds.REBOOT)
command = bytearray()
command += object_id + action_id
return command
def pack_request_full_hk_table_command(object_id: bytearray) -> bytearray: def pack_request_full_hk_table_command(object_id: bytearray) -> PusTelecommand:
""" Function to generate the command to request the full housekeeping table from a gomspace """ Function to generate the command to request the full housekeeping table from a gomspace
device. device.
@param object_id The object id of the gomspace device handler. @param object_id The object id of the gomspace device handler.
""" """
action_id = GomspaceDeviceActions.REQUEST_HK_TABLE return generate_action_command(
command = bytearray() object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE
command = object_id + action_id )
return command

25
pus_tc/acs.py Normal file
View File

@ -0,0 +1,25 @@
import enum
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.pus.service_list import PusServices
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.packer import PusTelecommand
from config.object_ids import MGM_0_HANDLER_ID, MGM_1_HANDLER_ID, MGM_2_HANDLER_ID, MGM_3_HANDLER_ID
class AcsOpCodes(enum.Enum):
# Switch on A side
ON_MGM_0 = "0"
ON_MGM_1 = "1"
ON_GYRO_0 = "2"
ON_GYRO_1 = "3"
# Switch on B side
ON_MGM_2 = "4"
ON_MGM_3 = "5"
ON_GYRO_2 = "6"
ON_GYRO_3 = "7"
def pack_acs_command(tc_queue: TcQueueT, op_code: str):
if op_code == AcsOpCodes.ON_MGM_0.value:
app_data = pack_mode_data(object_id=MGM_0_HANDLER_ID, mode=Modes.ON, submode=0)
# return PusTelecommand(service=PusServices.SERVICE_200_MODE, subservice=)

View File

@ -5,9 +5,7 @@
@author J. Meier @author J. Meier
@date 21.12.2020 @date 21.12.2020
""" """
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from pus_tc.p60dock import P60DockConfigTable from pus_tc.p60dock import P60DockConfigTable
@ -64,13 +62,13 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT):
p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address, p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.on P60DockConfigTable.out_en_0.parameter_size, Channel.on
) )
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.reboot: if ACUTestProcedure.all or ACUTestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reboot"))
command = pack_reboot_command(object_id) command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt: if ACUTestProcedure.all or ACUTestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ground watchdog timer value")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ground watchdog timer value"))
@ -78,54 +76,54 @@ def pack_acu_test_into(object_id: bytearray, tc_queue: TcQueueT):
object_id, TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address, object_id, TableIds.hk, ACUHkTable.wdt_gnd_left.parameter_address,
ACUHkTable.wdt_gnd_left.parameter_size ACUHkTable.wdt_gnd_left.parameter_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset: if ACUTestProcedure.all or ACUTestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id) command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.ping: if ACUTestProcedure.all or ACUTestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data) command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_temperature3: if ACUTestProcedure.all or ACUTestProcedure.read_temperature3:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading temperature 3"))
command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.temperature3.parameter_address, command = pack_get_param_command(object_id, TableIds.hk, ACUHkTable.temperature3.parameter_address,
ACUHkTable.temperature3.parameter_size) ACUHkTable.temperature3.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vboost: if ACUTestProcedure.all or ACUTestProcedure.read_vboost:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vboost value"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vboost.parameter_address, command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vboost.parameter_address,
ACUConfigTable.vboost.parameter_size) ACUConfigTable.vboost.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi: if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_hi:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_hi"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address, command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_hi.parameter_address,
ACUConfigTable.vbat_max_hi.parameter_size) ACUConfigTable.vbat_max_hi.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo: if ACUTestProcedure.all or ACUTestProcedure.read_vbat_max_lo:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading vbat_max_lo"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address, command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.vbat_max_lo.parameter_address,
ACUConfigTable.vbat_max_lo.parameter_size) ACUConfigTable.vbat_max_lo.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode: if ACUTestProcedure.all or ACUTestProcedure.read_ov_mode:
tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode")) tc_queue.appendleft((QueueCommands.PRINT, "ACU: Reading ov_mode"))
command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.ov_mode.parameter_address, command = pack_get_param_command(object_id, TableIds.config, ACUConfigTable.ov_mode.parameter_address,
ACUConfigTable.ov_mode.parameter_size) ACUConfigTable.ov_mode.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning off ACU"))
command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address, command = pack_set_param_command(p60dock_object_id, P60DockConfigTable.out_en_0.parameter_address,
P60DockConfigTable.out_en_0.parameter_size, Channel.off) P60DockConfigTable.out_en_0.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue

22
pus_tc/gps.py Normal file
View File

@ -0,0 +1,22 @@
import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from config.object_ids import GPS_HANDLER_1_ID, GPS_HANDLER_0_ID
class GpsOpCodes(enum.Enum):
RESET_GNSS = "5"
def pack_gps_command(object_id: bytes, tc_queue: TcQueueT, op_code: str):
if op_code == GpsOpCodes.RESET_GNSS.value:
if object_id == GPS_HANDLER_0_ID:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 0"))
elif object_id == GPS_HANDLER_1_ID:
tc_queue.appendleft((QueueCommands.PRINT, "Resetting GPS device 1"))
cmd = generate_action_command(object_id=object_id, action_id=int(op_code))
tc_queue.appendleft(cmd.pack_command_tuple())

View File

@ -61,25 +61,27 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
if op_code == "1": if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 on"))
parameter = 0 # set channel off command = pack_set_param_command(
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_9.parameter_address, object_id, P60DockConfigTable.out_en_9.parameter_address,
P60DockConfigTable.out_en_9.parameter_size, Channel.on) P60DockConfigTable.out_en_9.parameter_size, Channel.on
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) )
# command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue
if op_code == "2": if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Turning stack 3V3 off"))
parameter = 0 # set channel off command = pack_set_param_command(
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_9.parameter_address, object_id, P60DockConfigTable.out_en_9.parameter_address,
P60DockConfigTable.out_en_9.parameter_size, Channel.off) P60DockConfigTable.out_en_9.parameter_size, Channel.off
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) )
# command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue
if P60DockTestProcedure.all or P60DockTestProcedure.reboot: if P60DockTestProcedure.all or P60DockTestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reboot")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reboot"))
command = pack_reboot_command(object_id) command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_gnd_wdt: if P60DockTestProcedure.all or P60DockTestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Reading ground watchdog timer value"))
@ -87,25 +89,25 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address, object_id, TableIds.hk, P60DockHkTable.wdt_gnd_left.parameter_address,
P60DockHkTable.wdt_gnd_left.parameter_size P60DockHkTable.wdt_gnd_left.parameter_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.gnd_wdt_reset: if P60DockTestProcedure.all or P60DockTestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing ground watchdog reset")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id) command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.ping: if P60DockTestProcedure.all or P60DockTestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Ping")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Ping"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data) command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_off: if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 off")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 off"))
parameter = 0 # set channel off parameter = 0 # set channel off
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, parameter) P60DockConfigTable.out_en_3.parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_temperature1: if P60DockTestProcedure.all or P60DockTestProcedure.read_temperature1:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing temperature reading")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing temperature reading"))
@ -113,14 +115,14 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address,
P60DockHkTable.temperature1.parameter_size P60DockHkTable.temperature1.parameter_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on: if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing Output Channel 3 state (PDU2)")) "P60 Dock: Testing Output Channel 3 state (PDU2)"))
command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.out_en_3.parameter_address, command = pack_get_param_command(object_id, TableIds.config, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size) P60DockConfigTable.out_en_3.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.read_cur_lu_lim_0: if P60DockTestProcedure.all or P60DockTestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT, tc_queue.appendleft((QueueCommands.PRINT,
@ -129,14 +131,14 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address, object_id, TableIds.config, P60DockConfigTable.cur_lu_lim_0.parameter_address,
P60DockConfigTable.cur_lu_lim_0.parameter_size P60DockConfigTable.cur_lu_lim_0.parameter_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on: if P60DockTestProcedure.all or P60DockTestProcedure.channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 on")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing setting output channel 3 on"))
parameter = 1 # set channel on parameter = 1 # set channel on
command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address, command = pack_set_param_command(object_id, P60DockConfigTable.out_en_3.parameter_address,
P60DockConfigTable.out_en_3.parameter_size, parameter) P60DockConfigTable.out_en_3.parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_table_id_test: if P60DockTestProcedure.all or P60DockTestProcedure.invalid_table_id_test:
tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing invalid table id handling")) tc_queue.appendleft((QueueCommands.PRINT, "P60 Dock: Testing invalid table id handling"))
@ -145,7 +147,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
object_id, table_id_invalid, P60DockHkTable.temperature1.parameter_address, object_id, table_id_invalid, P60DockHkTable.temperature1.parameter_address,
P60DockHkTable.temperature1.parameter_size P60DockHkTable.temperature1.parameter_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_address_test: if P60DockTestProcedure.all or P60DockTestProcedure.invalid_address_test:
tc_queue.appendleft((QueueCommands.PRINT, tc_queue.appendleft((QueueCommands.PRINT,
@ -153,7 +155,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
invalid_address = bytearray([0x01, 0xF4]) invalid_address = bytearray([0x01, 0xF4])
command = pack_get_param_command(object_id, TableIds.hk, invalid_address, command = pack_get_param_command(object_id, TableIds.hk, invalid_address,
P60DockHkTable.temperature1.parameter_size) P60DockHkTable.temperature1.parameter_size)
command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=29, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.PRINT, tc_queue.appendleft((QueueCommands.PRINT,
"P60 Dock: Testing invalid address handling in set param command")) "P60 Dock: Testing invalid address handling in set param command"))
@ -161,7 +163,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
parameter_size = 2 parameter_size = 2
parameter = 1 parameter = 1
command = pack_set_param_command(object_id, invalid_address, parameter_size, parameter) command = pack_set_param_command(object_id, invalid_address, parameter_size, parameter)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if P60DockTestProcedure.all or P60DockTestProcedure.invalid_parameter_size_test: if P60DockTestProcedure.all or P60DockTestProcedure.invalid_parameter_size_test:
tc_queue.appendleft( tc_queue.appendleft(
@ -172,7 +174,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
command = pack_get_param_command( command = pack_get_param_command(
object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size object_id, TableIds.hk, P60DockHkTable.temperature1.parameter_address, invalid_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft( tc_queue.appendleft(
(QueueCommands.PRINT, (QueueCommands.PRINT,
@ -182,7 +184,7 @@ def pack_p60dock_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: st
command = pack_set_param_command( command = pack_set_param_command(
object_id, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter object_id, P60DockConfigTable.out_en_3.parameter_address, invalid_size, parameter
) )
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) # command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue

View File

@ -6,6 +6,7 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
import enum
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
@ -15,6 +16,14 @@ from pus_tc.p60dock import P60DockConfigTable
from gomspace.gomspace_pdu_definitions import * from gomspace.gomspace_pdu_definitions import *
class Pdu1OpCodes(enum.Enum):
STAR_TRACKER_ON = "1"
STAR_TRACKER_OFF = "4"
ACS_A_SIDE_ON = "6"
ACS_A_SIDE_OFF = "7"
PRINT_SWITCH_STATE = "17"
class PDU1TestProcedure: class PDU1TestProcedure:
""" """
@brief Use this class to define the tests to perform for the PDU2. @brief Use this class to define the tests to perform for the PDU2.
@ -31,51 +40,54 @@ class PDU1TestProcedure:
turn_channel_3_off = False turn_channel_3_off = False
def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str):
tc_queue.appendleft((QueueCommands.PRINT, "Commanding PDU1")) tc_queue.appendleft((QueueCommands.PRINT, "Commanding PDU1"))
if op_code == "1": if op_code == Pdu1OpCodes.STAR_TRACKER_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker on")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on) PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3": if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address, command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address,
PDUConfigTable.out_en_4.parameter_size, Channel.on) PDUConfigTable.out_en_4.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4": if op_code == Pdu1OpCodes.STAR_TRACKER_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker off")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn star tracker off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off) PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5": if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn SUS nominal off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address, command = pack_set_param_command(object_id, PDUConfigTable.out_en_4.parameter_address,
PDUConfigTable.out_en_4.parameter_size, Channel.off) PDUConfigTable.out_en_4.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6": if op_code == Pdu1OpCodes.ACS_A_SIDE_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A on")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_7.parameter_size, Channel.on) object_id, PDUConfigTable.out_en_7.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=34, app_data=command) PDUConfigTable.out_en_7.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if op_code == "7": if op_code == Pdu1OpCodes.ACS_A_SIDE_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A off")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn ACS Side A off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_7.parameter_size, Channel.off) object_id, PDUConfigTable.out_en_7.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=35, app_data=command) PDUConfigTable.out_en_7.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == Pdu1OpCodes.PRINT_SWITCH_STATE.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Print Switch Status"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_STATE
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.ping: if PDU1TestProcedure.all or PDU1TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data) command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature: if PDU1TestProcedure.all or PDU1TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Testing temperature reading"))
@ -83,29 +95,32 @@ def pack_pdu1_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str)
object_id, TableIds.hk, PDUHkTable.temperature.parameter_address, object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
PDUHkTable.temperature.parameter_size PDUHkTable.temperature.parameter_size
) )
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_on: if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 on (Star Tracker)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_2.parameter_size, Channel.on) object_id, PDUConfigTable.out_en_2.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) PDUConfigTable.out_en_2.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_off: if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 2 off (Star Tracker)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_2.parameter_size, Channel.off) object_id, PDUConfigTable.out_en_2.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) PDUConfigTable.out_en_2.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on: if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 on (MTQ)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_3.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_3.parameter_size, Channel.on) object_id, PDUConfigTable.out_en_3.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command) PDUConfigTable.out_en_3.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off: if PDU1TestProcedure.all or PDU1TestProcedure.turn_channel_3_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)")) tc_queue.appendleft((QueueCommands.PRINT, "PDU1: Turn channel 3 off (MTQ)"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_3.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_3.parameter_size, Channel.off) object_id, PDUConfigTable.out_en_3.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=33, app_data=command) PDUConfigTable.out_en_3.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())

View File

@ -8,10 +8,14 @@
""" """
from tmtccmd.config.definitions import QueueCommands from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT from tmtccmd.tc.packer import TcQueueT
from tmtccmd.ecss.tc import PusTelecommand
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from gomspace.gomspace_pdu_definitions import * from gomspace.gomspace_pdu_definitions import *
from pus_tc.p60dock import P60DockConfigTable
class Pdu2OpCodes(enum.Enum):
ACS_SIDE_B_ON = "1"
ACS_SIDE_B_OFF = "2"
PRINT_SWITCH_STATE = "17"
class PDU2TestProcedure: class PDU2TestProcedure:
@ -26,7 +30,7 @@ class PDU2TestProcedure:
gnd_wdt_reset = False gnd_wdt_reset = False
ping = False ping = False
channel_2_off = False # Reaction wheels 5V channel_2_off = False # Reaction wheels 5V
read_temperature = True read_temperature = False
read_channel_2_state = False # Reaction wheels 5V read_channel_2_state = False # Reaction wheels 5V
read_cur_lu_lim_0 = False # OBC read_cur_lu_lim_0 = False # OBC
channel_2_on = False # Reaction wheels 5V channel_2_on = False # Reaction wheels 5V
@ -39,77 +43,89 @@ class PDU2TestProcedure:
def pack_pdu2_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT: def pack_pdu2_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code: str) -> TcQueueT:
tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2")) tc_queue.appendleft((QueueCommands.PRINT, "Testing PDU2"))
if op_code == "1": if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B on")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B on"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_7.parameter_size, Channel.on) object_id, PDUConfigTable.out_en_7.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=30, app_data=command) PDUConfigTable.out_en_7.parameter_size, Channel.on
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue
if op_code == "2": if op_code == Pdu2OpCodes.ACS_SIDE_B_OFF.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B off")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Turn ACS Side B off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_7.parameter_address, command = pack_set_param_command(
PDUConfigTable.out_en_7.parameter_size, Channel.off) object_id, PDUConfigTable.out_en_7.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=31, app_data=command) PDUConfigTable.out_en_7.parameter_size, Channel.off
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue
if op_code == Pdu2OpCodes.PRINT_SWITCH_STATE.value:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Print Switch Status"))
command = generate_action_command(
object_id=object_id, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_STATE
)
tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.reboot: if PDU2TestProcedure.all or PDU2TestProcedure.reboot:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reboot")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reboot"))
command = pack_reboot_command(object_id) command = pack_reboot_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_gnd_wdt: if PDU2TestProcedure.all or PDU2TestProcedure.read_gnd_wdt:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading ground watchdog timer value"))
command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address, command = pack_get_param_command(
PDUHkTable.wdt_gnd_left.parameter_size) object_id, TableIds.hk, PDUHkTable.wdt_gnd_left.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=20, app_data=command) PDUHkTable.wdt_gnd_left.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.gnd_wdt_reset: if PDU2TestProcedure.all or PDU2TestProcedure.gnd_wdt_reset:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing ground watchdog reset")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing ground watchdog reset"))
command = pack_gnd_wdt_reset_command(object_id) command = pack_gnd_wdt_reset_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=21, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.ping: if PDU2TestProcedure.all or PDU2TestProcedure.ping:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Ping Test")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Ping Test"))
ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) ping_data = bytearray([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
command = pack_ping_command(object_id, ping_data) command = pack_ping_command(object_id, ping_data)
command = PusTelecommand(service=8, subservice=128, ssc=22, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_on: if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_on:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)")) tc_queue.appendleft(
(QueueCommands.PRINT, "PDU2: Testing setting output channel 2 on (TCS Heater)")
)
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.on) PDUConfigTable.out_en_2.parameter_size, Channel.on)
command = PusTelecommand(service=8, subservice=128, ssc=23, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature: if PDU2TestProcedure.all or PDU2TestProcedure.read_temperature:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing temperature reading"))
command = pack_get_param_command(object_id, TableIds.hk, PDUHkTable.temperature.parameter_address, command = pack_get_param_command(
PDUHkTable.temperature.parameter_size) object_id, TableIds.hk, PDUHkTable.temperature.parameter_address,
command = PusTelecommand(service=8, subservice=128, ssc=24, app_data=command) PDUHkTable.temperature.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_channel_2_state: if PDU2TestProcedure.all or PDU2TestProcedure.read_channel_2_state:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)")) tc_queue.appendleft(
command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address, (QueueCommands.PRINT, "PDU2: Reading output channel 2 state (TCS Heater)")
PDUConfigTable.out_en_2.parameter_size) )
command = PusTelecommand(service=8, subservice=128, ssc=25, app_data=command) command = pack_get_param_command(
object_id, TableIds.config, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.read_cur_lu_lim_0: if PDU2TestProcedure.all or PDU2TestProcedure.read_cur_lu_lim_0:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)")) tc_queue.appendleft(
command = pack_get_param_command(object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address, (QueueCommands.PRINT, "PDU2: Reading current limit value of output channel 0 (OBC)")
PDUConfigTable.cur_lu_lim_0.parameter_size) )
command = PusTelecommand(service=8, subservice=128, ssc=26, app_data=command) command = pack_get_param_command(
object_id, TableIds.config, PDUConfigTable.cur_lu_lim_0.parameter_address,
PDUConfigTable.cur_lu_lim_0.parameter_size
)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_off: if PDU2TestProcedure.all or PDU2TestProcedure.channel_2_off:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Testing setting output channel 2 off"))
command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address, command = pack_set_param_command(object_id, PDUConfigTable.out_en_2.parameter_address,
PDUConfigTable.out_en_2.parameter_size, Channel.off) PDUConfigTable.out_en_2.parameter_size, Channel.off)
command = PusTelecommand(service=8, subservice=128, ssc=27, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
if PDU2TestProcedure.all or PDU2TestProcedure.request_hk_table: if PDU2TestProcedure.all or PDU2TestProcedure.request_hk_table:
tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting housekeeping table")) tc_queue.appendleft((QueueCommands.PRINT, "PDU2: Requesting housekeeping table"))
command = pack_request_full_hk_table_command(object_id) command = pack_request_full_hk_table_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=28, app_data=command)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
return tc_queue return tc_queue

View File

@ -1,9 +1,5 @@
"""Hook function which packs telecommands based on service and operation code string
""" """
@brief This file transfers control of TC packing to the user
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import os import os
from collections import deque from collections import deque
from typing import Union from typing import Union
@ -30,10 +26,13 @@ from pus_tc.ploc_upater import pack_ploc_updater_test_into
from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd from pus_tc.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
from pus_tc.core import pack_core_commands from pus_tc.core import pack_core_commands
from pus_tc.star_tracker import pack_star_tracker_commands_into from pus_tc.star_tracker import pack_star_tracker_commands_into
from pus_tc.gps import pack_gps_command
from pus_tc.acs import pack_acs_command
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, ACU_HANDLER_ID, \ from config.object_ids import P60_DOCK_HANDLER, PDU_1_HANDLER_ID, PDU_2_HANDLER_ID, \
TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, \ ACU_HANDLER_ID, TMP_1075_1_HANDLER_ID, TMP_1075_2_HANDLER_ID, HEATER_ID, IMTQ_HANDLER_ID, \
RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID PLOC_MPSOC_ID, RW1_ID, RW2_ID, RW3_ID, RW4_ID, RAD_SENSOR_ID, PLOC_SUPV_ID, PLOC_UPDATER_ID, \
STAR_TRACKER_ID, PLOC_MEMORY_DUMPER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -87,21 +86,37 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, service_queu
return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_single_rw_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.RAD_SENSOR.value: if service == CustomServiceList.RAD_SENSOR.value:
object_id = RAD_SENSOR_ID object_id = RAD_SENSOR_ID
return pack_rad_sensor_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_rad_sensor_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.PLOC_SUPV.value: if service == CustomServiceList.PLOC_SUPV.value:
object_id = PLOC_SUPV_ID object_id = PLOC_SUPV_ID
return pack_ploc_supv_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_ploc_supv_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.PLOC_UPDATER.value: if service == CustomServiceList.PLOC_UPDATER.value:
object_id = PLOC_UPDATER_ID object_id = PLOC_UPDATER_ID
return pack_ploc_updater_test_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_ploc_updater_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.STAR_TRACKER.value: if service == CustomServiceList.STAR_TRACKER.value:
object_id = STAR_TRACKER_ID object_id = STAR_TRACKER_ID
return pack_star_tracker_commands_into(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_star_tracker_commands_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.CORE.value: if service == CustomServiceList.CORE.value:
return pack_core_commands(tc_queue=service_queue, op_code=op_code) return pack_core_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.PLOC_MEMORY_DUMPER.value: if service == CustomServiceList.PLOC_MEMORY_DUMPER.value:
object_id = PLOC_MEMORY_DUMPER_ID object_id = PLOC_MEMORY_DUMPER_ID
return pack_ploc_memory_dumper_cmd(object_id=object_id, tc_queue=service_queue, op_code=op_code) return pack_ploc_memory_dumper_cmd(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.ACS.value:
return pack_acs_command(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.GPS_0.value:
return pack_gps_command(object_id=GPS_HANDLER_0_ID, tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.GPS_1.value:
return pack_gps_command(object_id=GPS_HANDLER_1_ID, tc_queue=service_queue, op_code=op_code)
LOGGER.warning("Invalid Service !") LOGGER.warning("Invalid Service !")

View File

@ -27,16 +27,16 @@ def pus_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter):
service_type = raw_tm_packet[7] service_type = raw_tm_packet[7]
tm_packet = None tm_packet = None
if service_type == 1: if service_type == 1:
tm_packet = Service1TM(raw_tm_packet) tm_packet = Service1TM.unpack(raw_telemetry=raw_tm_packet)
if service_type == 3: if service_type == 3:
tm_packet = Service3TM(raw_tm_packet) tm_packet = Service3TM.unpack(raw_telemetry=raw_tm_packet, custom_hk_handling=False)
if service_type == 5: if service_type == 5:
tm_packet = Service5TM(raw_tm_packet) tm_packet = Service5TM.unpack(raw_telemetry=raw_tm_packet)
if service_type == 8: if service_type == 8:
tm_packet = Service8TM(raw_tm_packet) tm_packet = Service8TM.unpack(raw_telemetry=raw_tm_packet)
if service_type == 17: if service_type == 17:
tm_packet = Service17TM(raw_tm_packet) tm_packet = Service17TM.unpack(raw_telemetry=raw_tm_packet)
if tm_packet is None: if tm_packet is None:
LOGGER.info(f'The service {service_type} is not implemented in Telemetry Factory') LOGGER.info(f'The service {service_type} is not implemented in Telemetry Factory')
tm_packet = PusTelemetry(raw_tm_packet) tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet)
tmtc_printer.print_telemetry(packet=tm_packet) tmtc_printer.print_telemetry(packet_if=tm_packet, info_if=tm_packet, print_raw_tm=False)

View File

@ -1,40 +1,22 @@
""" """HK Handling for EIVE OBSW"""
@brief This file transfers control of housekeeping handling (PUS service 3) to the
developer
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import struct import struct
import os
import datetime
from typing import Tuple from typing import Tuple
from tmtccmd.tm.service_3_housekeeping import Service3Base from tmtccmd.tm.service_3_housekeeping import Service3Base
from tmtccmd.utility.logger import get_console_logger from tmtccmd.utility.logger import get_console_logger
from pus_tc.syrlinks_hk_handler import SetIds from pus_tc.syrlinks_hk_handler import SetIds
from pus_tc.imtq import ImtqSetIds from pus_tc.imtq import ImtqSetIds
from config.object_ids import SYRLINKS_HANDLER, IMTQ_HANDLER_ID from config.object_ids import SYRLINKS_HANDLER, IMTQ_HANDLER_ID, GPS_HANDLER_0_ID, GPS_HANDLER_1_ID
LOGGER = get_console_logger() LOGGER = get_console_logger()
def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray, def handle_user_hk_packet(
service3_packet: Service3Base) -> Tuple[list, list, bytearray, int]: object_id: bytes, set_id: int, hk_data: bytearray,
""" service3_packet: Service3Base
This function is called when a Service 3 Housekeeping packet is received. ) -> Tuple[list, list, bytearray, int]:
"""This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param set_id:
@param hk_data:
@param service3_packet:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
""" """
if object_id == SYRLINKS_HANDLER: if object_id == SYRLINKS_HANDLER:
if set_id == SetIds.RX_REGISTERS_DATASET: if set_id == SetIds.RX_REGISTERS_DATASET:
@ -50,6 +32,8 @@ def handle_user_hk_packet(object_id: bytes, set_id: int, hk_data: bytearray,
else: else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id") LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0 return [], [], bytearray(), 0
elif object_id == GPS_HANDLER_0_ID or object_id == GPS_HANDLER_1_ID:
return handle_gps_data(hk_data=hk_data)
else: else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0 return [], [], bytearray(), 0
@ -90,16 +74,21 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
hk_header = [] hk_header = []
hk_content = [] hk_content = []
validity_buffer = bytearray() validity_buffer = bytearray()
hk_header = ["Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]", "Init Cal Mag X [nT]", hk_header = [
"Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]", "Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Err", "Init Raw Mag X [nT]", "Init Raw Mag Y [nT]", "Init Raw Mag Z [nT]",
"Init Coil Z Current [mA]", "Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]", "Init Cal Mag X [nT]", "Init Cal Mag Y [nT]", "Init Cal Mag Z [nT]",
"Init Coil X Current [mA]", "Init Coil Y Current [mA]", "Init Coil Z Current [mA]",
"Init Coil X Temperature [°C]", "Init Coil Y Temperature [°C]",
"Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]", "Init Coil Z Temperature [°C]", "Err", "Raw Mag X [nT]", "Raw Mag Y [nT]", "Raw Mag Z [nT]",
"Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]", "Coil Y Current [mA]", "Cal Mag X [nT]", "Cal Mag Y [nT]", "Cal Mag Z [nT]", "Coil X Current [mA]",
"Coil Z Current [mA]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]", "Coil Y Current [mA]", "Coil Z Current [mA]",
"Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]", "Fina Cal Mag X [nT]", "Coil X Temperature [°C]", "Coil Y Temperature [°C]", "Coil Z Temperature [°C]",
"Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]", "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Err", "Fina Raw Mag X [nT]", "Fina Raw Mag Y [nT]", "Fina Raw Mag Z [nT]",
"Fina Coil Z Current [mA]", "Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]", "Fina Cal Mag X [nT]", "Fina Cal Mag Y [nT]", "Fina Cal Mag Z [nT]",
"Fina Coil Z Temperature [°C]"] "Fina Coil X Current [mA]", "Fina Coil Y Current [mA]", "Fina Coil Z Current [mA]",
"Fina Coil X Temperature [°C]", "Fina Coil Y Temperature [°C]",
"Fina Coil Z Temperature [°C]"
]
# INIT step (no coil actuation) # INIT step (no coil actuation)
init_err = hk_data[0] init_err = hk_data[0]
init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0] init_raw_mag_x = struct.unpack('!f', hk_data[1:5])[0]
@ -145,13 +134,65 @@ def handle_self_test_data(hk_data: bytearray) -> Tuple[list, list, bytearray, in
fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0] fina_coil_y_temperature = struct.unpack('!H', hk_data[125:127])[0]
fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0] fina_coil_z_temperature = struct.unpack('!H', hk_data[127:129])[0]
hk_content = [init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y, hk_content = [
init_err, init_raw_mag_x, init_raw_mag_y, init_raw_mag_z, init_cal_mag_x, init_cal_mag_y,
init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current, init_cal_mag_z, init_coil_x_current, init_coil_y_current, init_coil_z_current,
init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err, raw_mag_x, init_coil_x_temperature, init_coil_y_temperature, init_coil_z_temperature, err,
init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z, coil_x_current, coil_y_current, raw_mag_x, init_raw_mag_y, raw_mag_z, cal_mag_x, cal_mag_y, cal_mag_z,
coil_z_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err, fina_raw_mag_x, coil_x_current, coil_y_current, coil_z_current,
fina_raw_mag_y, fina_raw_mag_z, fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z, fina_coil_x_current, coil_x_temperature, coil_y_temperature, coil_z_temperature, fina_err,
fina_coil_y_current, fina_coil_z_current, fina_coil_x_temperature, fina_coil_y_temperature, fina_raw_mag_x, fina_raw_mag_y, fina_raw_mag_z,
fina_coil_z_temperature] fina_cal_mag_x, fina_cal_mag_y, fina_cal_mag_z,
fina_coil_x_current, fina_coil_y_current, fina_coil_z_current,
fina_coil_x_temperature, fina_coil_y_temperature, fina_coil_z_temperature
]
return hk_header, hk_content, validity_buffer, len(hk_header) return hk_header, hk_content, validity_buffer, len(hk_header)
def handle_gps_data(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
LOGGER.info(f'Received GPS data, HK data length {len(hk_data)}')
var_index = 0
header_array = []
content_array = []
latitude = struct.unpack('!d', hk_data[0:8])[0]
header_array.append('Latitude')
content_array.append(latitude)
longitude = struct.unpack('!d', hk_data[8:16])[0]
header_array.append('Longitude')
content_array.append(longitude)
altitude = struct.unpack('!d', hk_data[16:24])[0]
header_array.append('Altitude')
content_array.append(altitude)
fix_mode = hk_data[24]
header_array.append('Fix Mode')
content_array.append(fix_mode)
sat_in_use = hk_data[25]
header_array.append('Sats in Use')
content_array.append(sat_in_use)
year = struct.unpack('!H', hk_data[26:28])[0]
month = hk_data[28]
day = hk_data[29]
hours = hk_data[30]
minutes = hk_data[31]
seconds = hk_data[32]
header_array.append('Date')
date_string = f'{day}.{month}.{year} {hours}:{minutes}:{seconds}'
content_array.append(date_string)
unix_seconds = struct.unpack('!I', hk_data[33:37])[0]
header_array.append('Unix Seconds')
content_array.append(unix_seconds)
var_index += 13
if not os.path.isfile('gps_log.txt'):
with open("gps_log.txt", "w") as gps_file:
gps_file.write(
'Time, Latitude [deg], Longitude [deg], Altitude [m], Fix Mode, Sats in Use, '
'Date, Unix Seconds\n'
)
with open("gps_log.txt", "a") as gps_file:
gps_file.write(
f'{datetime.datetime.now()}, {latitude}, {longitude}, {altitude}, '
f'{fix_mode}, {sat_in_use}, {date_string}, {unix_seconds}\n'
)
validity_buffer = hk_data[37:39]
return header_array, content_array, validity_buffer, var_index

@ -1 +1 @@
Subproject commit 94978512deb6c91a6a1456ae3a1182bf679f063a Subproject commit 38be11cab16c036fb0968f01d46d198be4cd058f