syrlinks hk handler test

This commit is contained in:
Jakob Meier 2021-03-01 12:14:04 +01:00
parent cf572c311c
commit 81cd88f521
7 changed files with 41 additions and 14 deletions

View File

@ -36,6 +36,7 @@ class ServiceList(enum.Enum):
TMP1075_2 = auto() TMP1075_2 = auto()
HEATER = auto() HEATER = auto()
SA_DEPL = auto() SA_DEPL = auto()
SYRLINKS = auto()
class SerialConfig(enum.Enum): class SerialConfig(enum.Enum):

View File

@ -162,6 +162,8 @@ def add_globals_post_args_parsing(args: argparse.Namespace):
service = ServiceList.HEATER service = ServiceList.HEATER
elif service == "sa_depl": elif service == "sa_depl":
service = ServiceList.SA_DEPL service = ServiceList.SA_DEPL
elif service == "syrlinks":
service = ServiceList.SYRLINKS
else: else:
logger.warning("Service not known! Setting standard service 17") logger.warning("Service not known! Setting standard service 17")
service = ServiceList.SERVICE_17 service = ServiceList.SERVICE_17

View File

@ -21,6 +21,7 @@ from config.tmtcc_object_ids import ObjectIds
from pus_tc.tmtcc_tc_tmp1075 import pack_tmp1075_test_into from pus_tc.tmtcc_tc_tmp1075 import pack_tmp1075_test_into
from pus_tc.tmtcc_tc_heater import pack_heater_test_into from pus_tc.tmtcc_tc_heater import pack_heater_test_into
from pus_tc.tmtcc_tc_solar_array_deployment import pack_solar_array_deployment_test_into from pus_tc.tmtcc_tc_solar_array_deployment import pack_solar_array_deployment_test_into
from pus_tc.tmtcc_tc_syrlinks_hk_handler import pack_syrlinks_hk_handler_test_into
LOGGER = get_logger() LOGGER = get_logger()
@ -54,8 +55,11 @@ def pack_service_queue_user(service: ServiceList, op_code: str, service_queue: T
object_id = get_object_id(ObjectIds.HEATER) object_id = get_object_id(ObjectIds.HEATER)
return pack_heater_test_into(object_id, service_queue) return pack_heater_test_into(object_id, service_queue)
if service == ServiceList.SA_DEPL: if service == ServiceList.SA_DEPL:
object_id = get_object_id(ObjectIds.SOLAR_ARRAY_DEPLOYMENT_HANDLER) object_id = get_object_id(ObjectIds.SOLAR_ARRAY_DEPLOYMENT)
return pack_solar_array_deployment_test_into(object_id, service_queue) return pack_solar_array_deployment_test_into(object_id, service_queue)
if service == ServiceList.SYRLINKS:
object_id = get_object_id(ObjectIds.SYRLINKS_HK_HANDLER)
return pack_syrlinks_hk_handler_test_into(object_id, service_queue)
LOGGER.warning("Invalid Service !") LOGGER.warning("Invalid Service !")

View File

@ -9,12 +9,12 @@
from tmtc_core.core.tmtc_core_definitions import QueueCommands from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tm_service3_base import * from tmtc_core.pus_tc.tmtcc_tc_service_3_housekeeping import *
class SetIds: class SetIds:
RX_REGISTERS_DATASET = bytearray(0x0, 0x0, 0x0, 0x1) RX_REGISTERS_DATASET = 1
RX_REGISTERS_DATASET = bytearray(0x0, 0x0, 0x0, 0x2) TX_REGISTERS_DATASET = 2
def pack_syrlinks_hk_handler_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: def pack_syrlinks_hk_handler_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:

View File

@ -11,6 +11,7 @@ from config.tmtcc_object_ids import ObjectIds
from tmtc_core.pus_tm.tmtcc_tm_service3_base import Service3Base from tmtc_core.pus_tm.tmtcc_tm_service3_base import Service3Base
from tmtc_core.utility.tmtcc_logger import get_logger from tmtc_core.utility.tmtcc_logger import get_logger
from config.tmtcc_object_ids import ObjectIds from config.tmtcc_object_ids import ObjectIds
from pus_tc.tmtcc_tc_syrlinks_hk_handler import SetIds
LOGGER = get_logger() LOGGER = get_logger()
@ -36,9 +37,14 @@ def handle_user_hk_packet(object_id: ObjectIds, set_id: int, hk_data: bytearray,
the corresponding values. The bytearray is the validity buffer, which is usually appended the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters. at the end of the housekeeping packet. The last value is the number of parameters.
""" """
if object_id == ObjectIds.SYRLINKS_HK_HANDLER.value: if object_id == ObjectIds.SYRLINKS_HK_HANDLER:
if set_id == if set_id == SetIds.RX_REGISTERS_DATASET:
return handle_syrlinks_hk_data(hk_data) return handle_syrlinks_rx_registers_dataset(hk_data)
elif set_id == SetIds.TX_REGISTERS_DATASET:
return handle_syrlinks_tx_registers_dataset(hk_data)
else:
LOGGER.info("Serive 3 TM: Syrlinks handler reply with unknown set id")
return [], [], bytearray(), 0
else: else:
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.") LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
return [], [], bytearray(), 0 return [], [], bytearray(), 0
@ -53,12 +59,23 @@ def handle_syrlinks_rx_registers_dataset(hk_data: bytearray) -> Tuple[list, list
rx_status = hk_data[0] rx_status = hk_data[0]
rx_sensitivity = struct.unpack('!I', hk_data[1:5]) rx_sensitivity = struct.unpack('!I', hk_data[1:5])
rx_frequency_shift = struct.unpack('!I', hk_data[5:9]) rx_frequency_shift = struct.unpack('!I', hk_data[5:9])
rx_iq_power = struct.unpack('!H', hk_data[9:13]) rx_iq_power = struct.unpack('!H', hk_data[9:11])
rx_agc_value = struct.unpack('!H', hk_data[13:17]) rx_agc_value = struct.unpack('!H', hk_data[11:13])
rx_demod_eb = struct.unpack('!I', hk_data[17:21]) rx_demod_eb = struct.unpack('!I', hk_data[13:17])
rx_demod_n0 = struct.unpack('!I', hk_data[21:25]) rx_demod_n0 = struct.unpack('!I', hk_data[17:21])
rx_data_rate = hk_data[25] rx_data_rate = hk_data[21]
hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0, hk_content = [rx_status, rx_sensitivity, rx_frequency_shift, rx_iq_power, rx_agc_value, rx_demod_eb, rx_demod_n0,
rx_data_rate] rx_data_rate]
validity_buffer.append(hk_data[26:])
return hk_header, hk_content, validity_buffer, 8 return hk_header, hk_content, validity_buffer, 8
def handle_syrlinks_tx_registers_dataset(hk_data: bytearray) -> Tuple[list, list, bytearray, int]:
hk_header = []
hk_content = []
validity_buffer = bytearray()
hk_header = ["TX Status", "TX Waveform", "TX AGC value"]
tx_status = hk_data[0]
tx_waveform = hk_data[1]
tx_agc_value = struct.unpack('!H', hk_data[2:4])
hk_content = [tx_status, tx_waveform, tx_agc_value]
return hk_header, hk_content, validity_buffer, 3

View File

@ -3,6 +3,7 @@
@details Template configuration file. Copy this folder to the TMTC commander root and adapt @details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs. it to your needs.
""" """
from tmtc_core.pus_tm.tmtcc_pus_service_3 import Service3TM
from tmtc_core.pus_tm.tmtcc_pus_service_8 import Service8TM from tmtc_core.pus_tm.tmtcc_pus_service_8 import Service8TM
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.tmtcc_logger import get_logger from tmtc_core.utility.tmtcc_logger import get_logger
@ -18,6 +19,8 @@ def tm_user_factory_hook(raw_tm_packet: bytearray) -> PusTelemetry:
service_type = raw_tm_packet[7] service_type = raw_tm_packet[7]
if service_type == 1: if service_type == 1:
return Service1TM(raw_tm_packet) return Service1TM(raw_tm_packet)
if service_type == 3:
return Service3TM(raw_tm_packet)
if service_type == 5: if service_type == 5:
return Service5TM(raw_tm_packet) return Service5TM(raw_tm_packet)
if service_type == 8: if service_type == 8:

@ -1 +1 @@
Subproject commit c51a2ba18817c0dc7053ee6d7e15a00ef344b0ab Subproject commit dd623a5ef72c46cdb99334bc7fd166e45bb2dd56