From c36c7ca5bbe12b74f096414383eedfc16576a9d2 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 26 Jan 2023 11:31:26 +0100 Subject: [PATCH] move CCSDS handler definitions --- eive_tmtc/pus_tc/cmd_definitions.py | 15 --------- eive_tmtc/tmtc/com/ccsds_handler.py | 47 ++++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/eive_tmtc/pus_tc/cmd_definitions.py b/eive_tmtc/pus_tc/cmd_definitions.py index dbb7d35..fb5baa1 100644 --- a/eive_tmtc/pus_tc/cmd_definitions.py +++ b/eive_tmtc/pus_tc/cmd_definitions.py @@ -39,21 +39,6 @@ def add_pdec_cmds(defs: TmtcDefinitionWrapper): defs.add_service(CustomServiceList.PDEC_HANDLER.value, "PDEC Handler", oce) -@tmtc_definitions_provider -def add_ccsds_cmds(defs: TmtcDefinitionWrapper): - oce = OpCodeEntry() - oce.add("0", "CCSDS Handler: Set low rate") - oce.add("1", "CCSDS Handler: Set high rate") - oce.add("2", "CCSDS Handler: Enable transmitter") - oce.add("3", "CCSDS Handler: Disable transmitter") - oce.add("4", "CCSDS Handler: Set arbitrary bitrate") - oce.add("5", "CCSDS Handler: Enable tx clock manipulator") - oce.add("6", "CCSDS Handler: Disable tx clock manipulator") - oce.add("7", "CCSDS Handler: Update tx data on rising edge") - oce.add("8", "CCSDS Handler: Update tx data on falling edge") - defs.add_service(CustomServiceList.CCSDS_HANDLER.value, "CCSDS Handler", oce) - - @tmtc_definitions_provider def add_str_cmds(defs: TmtcDefinitionWrapper): oce = OpCodeEntry() diff --git a/eive_tmtc/tmtc/com/ccsds_handler.py b/eive_tmtc/tmtc/com/ccsds_handler.py index 12a43b0..f9293dd 100644 --- a/eive_tmtc/tmtc/com/ccsds_handler.py +++ b/eive_tmtc/tmtc/com/ccsds_handler.py @@ -8,7 +8,13 @@ import enum import struct +from eive_tmtc.config.definitions import CustomServiceList from spacepackets.ecss.tc import PusTelecommand +from tmtccmd.config.tmtc import ( + tmtc_definitions_provider, + OpCodeEntry, + TmtcDefinitionWrapper, +) from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc.pus_200_fsfw_modes import create_mode_command, Mode from tmtccmd.util import ObjectIdU32 @@ -47,31 +53,40 @@ class Submode(enum.IntEnum): class OpCode: ENABLE_WITH_LOW_DATARATE = ["enable_low_datarate"] ENABLE_WITH_HIGH_DATARATE = ["enable_high_datarate"] - DISABLE_HIGH_DATARATE = ["disable"] + DISABLE = ["disable"] ENABLE_ACTION = ["legacy_enable_tx"] DISABLE_ACTION = ["legacy_disable_tx"] +class Info: + ENABLE_WITH_LOW_DATARATE = "Enable TX with low datarate" + ENABLE_WITH_HIGH_DATARATE = "Enable TX with high datarate" + DISABLE = "Disable TX" + ENABLE_ACTION = "Enable TX (legacy)" + DISABLE_ACTION = "Disable TX (legacy)" + + def pack_ccsds_handler_test( object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str ): obyt = object_id.as_bytes + prefix = "CCSDS Handler" q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") if op_code in OpCode.ENABLE_WITH_LOW_DATARATE: - q.add_log_cmd("CCSDS Handler: Set low rate") + q.add_log_cmd(f"{prefix}: {Info.ENABLE_WITH_LOW_DATARATE}") q.add_pus_tc(create_mode_command(obyt, Mode.ON, Submode.DATARATE_LOW)) if op_code in OpCode.ENABLE_WITH_HIGH_DATARATE: - q.add_log_cmd("CCSDS Handler: Set high rate") + q.add_log_cmd(f"{prefix}: {Info.ENABLE_WITH_HIGH_DATARATE}") q.add_pus_tc(create_mode_command(obyt, Mode.ON, Submode.DATARATE_HIGH)) - if op_code in OpCode.DISABLE_ACTION: - q.add_log_cmd("CCSDS Handler: Disable") + if op_code in OpCode.DISABLE: + q.add_log_cmd(f"{prefix}: {Info.DISABLE}") q.add_pus_tc(create_mode_command(obyt, Mode.OFF, 0)) if op_code in OpCode.ENABLE_ACTION: - q.add_log_cmd("CCSDS Handler: Enables the transmitter") + q.add_log_cmd(f"{prefix}: {Info.ENABLE_ACTION}") command = obyt + struct.pack("!I", ActionId.EN_TRANSMITTER) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) - if op_code == "3": - q.add_log_cmd("CCSDS Handler: Disables the transmitter") + if op_code in OpCode.DISABLE_ACTION: + q.add_log_cmd(f"{prefix}: {Info.DISABLE_ACTION}") command = obyt + struct.pack("!I", ActionId.DIS_TRANSMITTER) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "4": @@ -99,3 +114,19 @@ def pack_ccsds_handler_test( q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock") command = obyt + struct.pack("!I", ActionId.UPDATE_ON_FALLING_EDGE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) + + +@tmtc_definitions_provider +def add_ccsds_cmds(defs: TmtcDefinitionWrapper): + oce = OpCodeEntry() + oce.add(OpCode.ENABLE_WITH_LOW_DATARATE, Info.ENABLE_WITH_LOW_DATARATE) + oce.add(OpCode.ENABLE_WITH_HIGH_DATARATE, Info.ENABLE_WITH_HIGH_DATARATE) + oce.add(OpCode.DISABLE, Info.DISABLE) + oce.add(OpCode.ENABLE_ACTION, Info.ENABLE_ACTION) + oce.add(OpCode.DISABLE_ACTION, Info.DISABLE_ACTION) + oce.add("4", "CCSDS Handler: Set arbitrary bitrate") + oce.add("5", "CCSDS Handler: Enable tx clock manipulator") + oce.add("6", "CCSDS Handler: Disable tx clock manipulator") + oce.add("7", "CCSDS Handler: Update tx data on rising edge") + oce.add("8", "CCSDS Handler: Update tx data on falling edge") + defs.add_service(CustomServiceList.CCSDS_HANDLER.value, "CCSDS Handler", oce)