move CCSDS handler definitions

This commit is contained in:
Robin Müller 2023-01-26 11:31:26 +01:00
parent 8a7880bc35
commit c36c7ca5bb
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
2 changed files with 39 additions and 23 deletions

View File

@ -39,21 +39,6 @@ def add_pdec_cmds(defs: TmtcDefinitionWrapper):
defs.add_service(CustomServiceList.PDEC_HANDLER.value, "PDEC Handler", oce) defs.add_service(CustomServiceList.PDEC_HANDLER.value, "PDEC Handler", oce)
@tmtc_definitions_provider
def add_ccsds_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add("0", "CCSDS Handler: Set low rate")
oce.add("1", "CCSDS Handler: Set high rate")
oce.add("2", "CCSDS Handler: Enable transmitter")
oce.add("3", "CCSDS Handler: Disable transmitter")
oce.add("4", "CCSDS Handler: Set arbitrary bitrate")
oce.add("5", "CCSDS Handler: Enable tx clock manipulator")
oce.add("6", "CCSDS Handler: Disable tx clock manipulator")
oce.add("7", "CCSDS Handler: Update tx data on rising edge")
oce.add("8", "CCSDS Handler: Update tx data on falling edge")
defs.add_service(CustomServiceList.CCSDS_HANDLER.value, "CCSDS Handler", oce)
@tmtc_definitions_provider @tmtc_definitions_provider
def add_str_cmds(defs: TmtcDefinitionWrapper): def add_str_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()

View File

@ -8,7 +8,13 @@
import enum import enum
import struct import struct
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
OpCodeEntry,
TmtcDefinitionWrapper,
)
from tmtccmd.tc import DefaultPusQueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import create_mode_command, Mode from tmtccmd.tc.pus_200_fsfw_modes import create_mode_command, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -47,31 +53,40 @@ class Submode(enum.IntEnum):
class OpCode: class OpCode:
ENABLE_WITH_LOW_DATARATE = ["enable_low_datarate"] ENABLE_WITH_LOW_DATARATE = ["enable_low_datarate"]
ENABLE_WITH_HIGH_DATARATE = ["enable_high_datarate"] ENABLE_WITH_HIGH_DATARATE = ["enable_high_datarate"]
DISABLE_HIGH_DATARATE = ["disable"] DISABLE = ["disable"]
ENABLE_ACTION = ["legacy_enable_tx"] ENABLE_ACTION = ["legacy_enable_tx"]
DISABLE_ACTION = ["legacy_disable_tx"] DISABLE_ACTION = ["legacy_disable_tx"]
class Info:
ENABLE_WITH_LOW_DATARATE = "Enable TX with low datarate"
ENABLE_WITH_HIGH_DATARATE = "Enable TX with high datarate"
DISABLE = "Disable TX"
ENABLE_ACTION = "Enable TX (legacy)"
DISABLE_ACTION = "Disable TX (legacy)"
def pack_ccsds_handler_test( def pack_ccsds_handler_test(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
): ):
obyt = object_id.as_bytes obyt = object_id.as_bytes
prefix = "CCSDS Handler"
q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}")
if op_code in OpCode.ENABLE_WITH_LOW_DATARATE: if op_code in OpCode.ENABLE_WITH_LOW_DATARATE:
q.add_log_cmd("CCSDS Handler: Set low rate") q.add_log_cmd(f"{prefix}: {Info.ENABLE_WITH_LOW_DATARATE}")
q.add_pus_tc(create_mode_command(obyt, Mode.ON, Submode.DATARATE_LOW)) q.add_pus_tc(create_mode_command(obyt, Mode.ON, Submode.DATARATE_LOW))
if op_code in OpCode.ENABLE_WITH_HIGH_DATARATE: if op_code in OpCode.ENABLE_WITH_HIGH_DATARATE:
q.add_log_cmd("CCSDS Handler: Set high rate") q.add_log_cmd(f"{prefix}: {Info.ENABLE_WITH_HIGH_DATARATE}")
q.add_pus_tc(create_mode_command(obyt, Mode.ON, Submode.DATARATE_HIGH)) q.add_pus_tc(create_mode_command(obyt, Mode.ON, Submode.DATARATE_HIGH))
if op_code in OpCode.DISABLE_ACTION: if op_code in OpCode.DISABLE:
q.add_log_cmd("CCSDS Handler: Disable") q.add_log_cmd(f"{prefix}: {Info.DISABLE}")
q.add_pus_tc(create_mode_command(obyt, Mode.OFF, 0)) q.add_pus_tc(create_mode_command(obyt, Mode.OFF, 0))
if op_code in OpCode.ENABLE_ACTION: if op_code in OpCode.ENABLE_ACTION:
q.add_log_cmd("CCSDS Handler: Enables the transmitter") q.add_log_cmd(f"{prefix}: {Info.ENABLE_ACTION}")
command = obyt + struct.pack("!I", ActionId.EN_TRANSMITTER) command = obyt + struct.pack("!I", ActionId.EN_TRANSMITTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "3": if op_code in OpCode.DISABLE_ACTION:
q.add_log_cmd("CCSDS Handler: Disables the transmitter") q.add_log_cmd(f"{prefix}: {Info.DISABLE_ACTION}")
command = obyt + struct.pack("!I", ActionId.DIS_TRANSMITTER) command = obyt + struct.pack("!I", ActionId.DIS_TRANSMITTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "4": if op_code == "4":
@ -99,3 +114,19 @@ def pack_ccsds_handler_test(
q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock") q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock")
command = obyt + struct.pack("!I", ActionId.UPDATE_ON_FALLING_EDGE) command = obyt + struct.pack("!I", ActionId.UPDATE_ON_FALLING_EDGE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
@tmtc_definitions_provider
def add_ccsds_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(OpCode.ENABLE_WITH_LOW_DATARATE, Info.ENABLE_WITH_LOW_DATARATE)
oce.add(OpCode.ENABLE_WITH_HIGH_DATARATE, Info.ENABLE_WITH_HIGH_DATARATE)
oce.add(OpCode.DISABLE, Info.DISABLE)
oce.add(OpCode.ENABLE_ACTION, Info.ENABLE_ACTION)
oce.add(OpCode.DISABLE_ACTION, Info.DISABLE_ACTION)
oce.add("4", "CCSDS Handler: Set arbitrary bitrate")
oce.add("5", "CCSDS Handler: Enable tx clock manipulator")
oce.add("6", "CCSDS Handler: Disable tx clock manipulator")
oce.add("7", "CCSDS Handler: Update tx data on rising edge")
oce.add("8", "CCSDS Handler: Update tx data on falling edge")
defs.add_service(CustomServiceList.CCSDS_HANDLER.value, "CCSDS Handler", oce)