Merge branch 'main' into mueller/add-acs-ss-cmds
This commit is contained in:
commit
09757f0150
@ -128,30 +128,6 @@ def add_str_cmds(defs: TmtcDefinitionWrapper):
|
|||||||
defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce)
|
defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce)
|
||||||
|
|
||||||
|
|
||||||
@tmtc_definitions_provider
|
|
||||||
def add_syrlinks_cmds(defs: TmtcDefinitionWrapper):
|
|
||||||
oce = OpCodeEntry()
|
|
||||||
oce.add("0", "Syrlinks Handler: Set mode off")
|
|
||||||
oce.add("1", "Syrlinks Handler: Set mode on")
|
|
||||||
oce.add("2", "Syrlinks Handler: Set mode normal")
|
|
||||||
oce.add("3", "Syrlinks Handler: Set TX standby")
|
|
||||||
oce.add("4", "Syrlinks Handler: Set TX modulation")
|
|
||||||
oce.add("5", "Syrlinks Handler: Set TX carrier wave")
|
|
||||||
oce.add("6", "Syrlinks Handler: Read TX status")
|
|
||||||
oce.add("7", "Syrlinks Handler: Read TX waveform")
|
|
||||||
oce.add("8", "Syrlinks Handler: Read TX AGC value high byte")
|
|
||||||
oce.add("9", "Syrlinks Handler: Read TX AGC value low byte")
|
|
||||||
oce.add("12", "Syrlinks Handler: Write LCL config")
|
|
||||||
oce.add("13", "Syrlinks Handler: Read RX status registers")
|
|
||||||
oce.add("14", "Syrlinks Handler: Read LCL config register")
|
|
||||||
oce.add("15", "Syrlinks Handler: Set waveform OQPSK")
|
|
||||||
oce.add("16", "Syrlinks Handler: Set waveform BPSK")
|
|
||||||
oce.add("17", "Syrlinks Handler: Set second config")
|
|
||||||
oce.add("18", "Syrlinks Handler: Enable debug output")
|
|
||||||
oce.add("19", "Syrlinks Handler: Disable debug output")
|
|
||||||
defs.add_service(CustomServiceList.SYRLINKS.value, "Syrlinks Handler", oce)
|
|
||||||
|
|
||||||
|
|
||||||
@tmtc_definitions_provider
|
@tmtc_definitions_provider
|
||||||
def add_bpx_cmd_definitions(defs: TmtcDefinitionWrapper):
|
def add_bpx_cmd_definitions(defs: TmtcDefinitionWrapper):
|
||||||
oce = OpCodeEntry()
|
oce = OpCodeEntry()
|
||||||
|
@ -5,6 +5,12 @@
|
|||||||
@author J. Meier
|
@author J. Meier
|
||||||
@date 13.12.2020
|
@date 13.12.2020
|
||||||
"""
|
"""
|
||||||
|
from config.definitions import CustomServiceList
|
||||||
|
from tmtccmd.config.tmtc import (
|
||||||
|
tmtc_definitions_provider,
|
||||||
|
TmtcDefinitionWrapper,
|
||||||
|
OpCodeEntry,
|
||||||
|
)
|
||||||
from tmtccmd.tc import DefaultPusQueueHelper
|
from tmtccmd.tc import DefaultPusQueueHelper
|
||||||
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
|
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
|
||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
@ -20,7 +26,11 @@ class SetIds:
|
|||||||
|
|
||||||
|
|
||||||
class OpCodes:
|
class OpCodes:
|
||||||
NORMAL = ["2", "nml"]
|
OFF = "off"
|
||||||
|
ON = "on"
|
||||||
|
NORMAL = "nml"
|
||||||
|
STANDBY = "standby"
|
||||||
|
MODULATION = "modulation"
|
||||||
|
|
||||||
|
|
||||||
class CommandIds:
|
class CommandIds:
|
||||||
@ -41,28 +51,52 @@ class CommandIds:
|
|||||||
DISABLE_DEBUG = 21
|
DISABLE_DEBUG = 21
|
||||||
|
|
||||||
|
|
||||||
|
@tmtc_definitions_provider
|
||||||
|
def add_syrlinks_cmds(defs: TmtcDefinitionWrapper):
|
||||||
|
oce = OpCodeEntry()
|
||||||
|
oce.add(OpCodes.OFF, "Syrlinks Handler: Set mode off")
|
||||||
|
oce.add(OpCodes.ON, "Syrlinks Handler: Set mode on")
|
||||||
|
oce.add(OpCodes.NORMAL, "Syrlinks Handler: Set mode normal")
|
||||||
|
oce.add(OpCodes.STANDBY, "Syrlinks Handler: Set TX standby")
|
||||||
|
oce.add(OpCodes.MODULATION, "Syrlinks Handler: Set TX modulation")
|
||||||
|
oce.add("5", "Syrlinks Handler: Set TX carrier wave")
|
||||||
|
oce.add("6", "Syrlinks Handler: Read TX status")
|
||||||
|
oce.add("7", "Syrlinks Handler: Read TX waveform")
|
||||||
|
oce.add("8", "Syrlinks Handler: Read TX AGC value high byte")
|
||||||
|
oce.add("9", "Syrlinks Handler: Read TX AGC value low byte")
|
||||||
|
oce.add("12", "Syrlinks Handler: Write LCL config")
|
||||||
|
oce.add("13", "Syrlinks Handler: Read RX status registers")
|
||||||
|
oce.add("14", "Syrlinks Handler: Read LCL config register")
|
||||||
|
oce.add("15", "Syrlinks Handler: Set waveform OQPSK")
|
||||||
|
oce.add("16", "Syrlinks Handler: Set waveform BPSK")
|
||||||
|
oce.add("17", "Syrlinks Handler: Set second config")
|
||||||
|
oce.add("18", "Syrlinks Handler: Enable debug output")
|
||||||
|
oce.add("19", "Syrlinks Handler: Disable debug output")
|
||||||
|
defs.add_service(CustomServiceList.SYRLINKS.value, "Syrlinks Handler", oce)
|
||||||
|
|
||||||
|
|
||||||
def pack_syrlinks_command(
|
def pack_syrlinks_command(
|
||||||
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
|
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
|
||||||
):
|
):
|
||||||
obyt = object_id.as_bytes
|
obyt = object_id.as_bytes
|
||||||
q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}")
|
q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}")
|
||||||
if op_code == "0":
|
if op_code == OpCodes.OFF:
|
||||||
q.add_log_cmd("Syrlinks: Set mode off")
|
q.add_log_cmd("Syrlinks: Set mode off")
|
||||||
data = pack_mode_data(obyt, Modes.OFF, 0)
|
data = pack_mode_data(obyt, Modes.OFF, 0)
|
||||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
||||||
if op_code == "1":
|
if op_code == OpCodes.ON:
|
||||||
q.add_log_cmd("Syrlinks: Set mode on")
|
q.add_log_cmd("Syrlinks: Set mode on")
|
||||||
data = pack_mode_data(obyt, Modes.ON, 0)
|
data = pack_mode_data(obyt, Modes.ON, 0)
|
||||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
||||||
if op_code == "2":
|
if op_code == OpCodes.NORMAL:
|
||||||
q.add_log_cmd("Syrlinks: Mode Normal")
|
q.add_log_cmd("Syrlinks: Mode Normal")
|
||||||
data = pack_mode_data(obyt, Modes.NORMAL, 0)
|
data = pack_mode_data(obyt, Modes.NORMAL, 0)
|
||||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=data))
|
||||||
if op_code == "3":
|
if op_code == OpCodes.STANDBY:
|
||||||
q.add_log_cmd("syrlinks: Set TX mode standby")
|
q.add_log_cmd("syrlinks: Set TX mode standby")
|
||||||
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY)
|
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY)
|
||||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
||||||
if op_code == "4":
|
if op_code == OpCodes.MODULATION:
|
||||||
q.add_log_cmd("syrlinks: Set TX mode modulation")
|
q.add_log_cmd("syrlinks: Set TX mode modulation")
|
||||||
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION)
|
data = obyt + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION)
|
||||||
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=data))
|
||||||
|
4
tmtcc.py
4
tmtcc.py
@ -170,6 +170,8 @@ class PusHandler(SpecificApidHandlerBase):
|
|||||||
self.raw_logger = raw_logger
|
self.raw_logger = raw_logger
|
||||||
|
|
||||||
def handle_tm(self, packet: bytes, _user_args: any):
|
def handle_tm(self, packet: bytes, _user_args: any):
|
||||||
|
# with open("tc.bin", "wb") as of:
|
||||||
|
# of.write(packet)
|
||||||
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger)
|
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger)
|
||||||
|
|
||||||
|
|
||||||
@ -255,6 +257,8 @@ class TcHandler(TcHandlerBase):
|
|||||||
self.high_level_file_logger.info(
|
self.high_level_file_logger.info(
|
||||||
f"{get_current_time_string(True)}: {tc_info_string}"
|
f"{get_current_time_string(True)}: {tc_info_string}"
|
||||||
)
|
)
|
||||||
|
# with open("tc.bin", "wb") as of:
|
||||||
|
# of.write(raw_tc)
|
||||||
send_params.com_if.send(raw_tc)
|
send_params.com_if.send(raw_tc)
|
||||||
elif entry_helper.entry_type == TcQueueEntryType.CCSDS_TC:
|
elif entry_helper.entry_type == TcQueueEntryType.CCSDS_TC:
|
||||||
cfdp_packet_in_ccsds = entry_helper.to_space_packet_entry()
|
cfdp_packet_in_ccsds = entry_helper.to_space_packet_entry()
|
||||||
|
Loading…
Reference in New Issue
Block a user