api update done

This commit is contained in:
Robin Müller 2022-07-05 02:12:54 +02:00
parent 4a654857ce
commit e009167784
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
41 changed files with 547 additions and 1104 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ log
/gps_log.txt
/config/*.json
tmtc_conf.json
/seqcnt.txt

View File

@ -1,14 +0,0 @@
"""
@brief This file exposes hook functions to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
from spacepackets.ecss.tc import PusTelecommand
def command_preparation_hook() -> PusTelecommand:
"""
Can be used to pack user-defined commands by generating and returning a PusTelecommand
class instance
"""
return PusTelecommand(service=17, subservice=1, ssc=20)

View File

@ -5,7 +5,7 @@
"""
import enum
from tmtccmd.core.backend import TmTcHandler
from tmtccmd import CcsdsTmtcBackend
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
@ -15,5 +15,5 @@ class CustomModeList(enum.IntEnum):
pass
def custom_mode_operation(tmtc_backend: TmTcHandler, mode: int):
def custom_mode_operation(_tmtc_backend: CcsdsTmtcBackend, _mode: int):
pass

View File

@ -30,7 +30,7 @@ class EiveHookObject(TmTcCfgHookBase):
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
from config.custom_mode_op import custom_mode_operation
custom_mode_operation(mode=mode, tmtc_backend=tmtc_backend)
custom_mode_operation(tmtc_backend, mode)
def get_object_ids(self) -> ObjectIdDictT:
from config.object_ids import get_object_ids

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit 2890c62e31999706ff1a7164b8a9f7cbc7bc34d5
Subproject commit 7835963dfebc95237eec791b07a8faaf9185ba0b

View File

@ -10,9 +10,9 @@ import enum
import struct
from typing import Union
from spacepackets.ecss import PusTelecommand
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.definitions import PusTelecommand
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class GomspaceDeviceActionIds(enum.IntEnum):
@ -138,7 +138,7 @@ def pack_set_param_command(
)
def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand:
def pack_ping_command(object_id: ObjectIdU32, data: bytearray) -> PusTelecommand:
""" " Function to generate the command to ping a gomspace device
@param object_id Object Id of the gomspace device handler.
@param data Bytearray containing the bytes to send to the gomspace device. For now the on board software
@ -153,7 +153,7 @@ def pack_ping_command(object_id: ObjectId, data: bytearray) -> PusTelecommand:
)
def pack_gnd_wdt_reset_command(object_id: ObjectId) -> PusTelecommand:
def pack_gnd_wdt_reset_command(object_id: ObjectIdU32) -> PusTelecommand:
""" " Function to generate the command to reset the watchdog of a gomspace device.
@param object_id Object Id of the gomspace device handler.
"""
@ -162,7 +162,7 @@ def pack_gnd_wdt_reset_command(object_id: ObjectId) -> PusTelecommand:
)
def pack_reboot_command(object_id: ObjectId) -> PusTelecommand:
def pack_reboot_command(object_id: ObjectIdU32) -> PusTelecommand:
"""Function to generate the command which triggers a reboot of a gomspace device
@param object_id The object id of the gomspace device handler.
"""
@ -171,7 +171,7 @@ def pack_reboot_command(object_id: ObjectId) -> PusTelecommand:
)
def pack_request_full_hk_table_command(object_id: ObjectId) -> PusTelecommand:
def pack_request_full_hk_table_command(object_id: ObjectIdU32) -> PusTelecommand:
"""Function to generate the command to request the full housekeeping table from a gomspace
device.
@param object_id The object id of the gomspace device handler.

View File

@ -11,597 +11,364 @@ from pus_tc.devs.reaction_wheels import add_rw_cmds
from pus_tc.devs.bpx_batt import BpxOpCodes
from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.config.globals import get_default_tmtc_defs
def get_eive_service_op_code_dict() -> TmTcDefWrapper:
def_wrapper = get_default_tmtc_defs()
add_bpx_cmd_definitions(cmd_dict=def_wrapper)
add_bpx_cmd_definitions(defs=def_wrapper)
add_core_controller_definitions(defs=def_wrapper)
add_pl_pcdu_cmds(cmd_dict=def_wrapper)
add_pcdu_cmds(cmd_dict=def_wrapper)
specify_rtd_cmds(cmd_dict=def_wrapper)
add_imtq_cmds(cmd_dict=def_wrapper)
add_rad_sens_cmds(cmd_dict=def_wrapper)
add_rw_cmds(cmd_dict=def_wrapper)
add_ploc_mpsoc_cmds(cmd_dict=def_wrapper)
add_ploc_supv_cmds(cmd_dict=def_wrapper)
add_system_cmds(cmd_dict=def_wrapper)
add_time_cmds(cmd_dict=def_wrapper)
add_syrlinks_cmds(cmd_dict=def_wrapper)
add_gps_cmds(cmd_dict=def_wrapper)
add_str_cmds(cmd_dict=def_wrapper)
add_ccsds_cmds(cmd_dict=def_wrapper)
add_pdec_cmds(cmd_dict=def_wrapper)
add_heater_cmds(cmd_dict=def_wrapper)
add_tmp_sens_cmds(cmd_dict=def_wrapper)
add_proc_cmds(cmd_dict=def_wrapper)
add_pl_pcdu_cmds(defs=def_wrapper)
add_pcdu_cmds(defs=def_wrapper)
specify_rtd_cmds(defs=def_wrapper)
add_imtq_cmds(defs=def_wrapper)
add_rad_sens_cmds(defs=def_wrapper)
add_rw_cmds(defs=def_wrapper)
add_ploc_mpsoc_cmds(defs=def_wrapper)
add_ploc_supv_cmds(defs=def_wrapper)
add_system_cmds(defs=def_wrapper)
add_time_cmds(defs=def_wrapper)
add_syrlinks_cmds(defs=def_wrapper)
add_gps_cmds(defs=def_wrapper)
add_str_cmds(defs=def_wrapper)
add_ccsds_cmds(defs=def_wrapper)
add_pdec_cmds(defs=def_wrapper)
add_heater_cmds(defs=def_wrapper)
add_tmp_sens_cmds(defs=def_wrapper)
add_proc_cmds(defs=def_wrapper)
return def_wrapper
def add_tmp_sens_cmds(defs: TmTcDefWrapper):
op_code_dict = {
"0": ("TMP1075 Tests", {OpCodeDictKeys.TIMEOUT: 2.2}),
}
service_tuple = ("TMP1075 1", op_code_dict)
cmd_dict[CustomServiceList.TMP1075_1.value] = service_tuple
service_tuple = ("TMP1075 2", op_code_dict)
cmd_dict[CustomServiceList.TMP1075_2.value] = service_tuple
oce = OpCodeEntry()
oce.add("0", "TMP1075 Tests")
defs.add_service(CustomServiceList.TMP1075_1.value, "TMP1075 1", oce)
defs.add_service(CustomServiceList.TMP1075_2.value, "TMP1075 2", oce)
def add_pdec_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_pdec_handler = {
"0": ("PDEC Handler: Print CLCW", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDEC Handler: Print PDEC monitor", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_pdec_handler_tuple = ("PDEC Handler", op_code_dict_srv_pdec_handler)
cmd_dict[CustomServiceList.PDEC_HANDLER.value] = service_pdec_handler_tuple
def add_pdec_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "PDEC Handler: Print CLCW")
oce.add("1", "PDEC Handler: Print PDEC monitor")
defs.add_service(CustomServiceList.PDEC_HANDLER.value, "PDEC Handler", oce)
def add_ccsds_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_ccsds_handler = {
"0": ("CCSDS Handler: Set low rate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("CCSDS Handler: Set high rate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("CCSDS Handler: Set arbitrary bitrate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": (
"CCSDS Handler: Enable tx clock manipulator",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"6": (
"CCSDS Handler: Disable tx clock manipulator",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"7": (
"CCSDS Handler: Update tx data on rising edge",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"8": (
"CCSDS Handler: Update tx data on falling edge",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler)
cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple
def add_ccsds_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "CCSDS Handler: Set low rate")
oce.add("1", "CCSDS Handler: Set high rate")
oce.add("2", "CCSDS Handler: Enable transmitter")
oce.add("3", "CCSDS Handler: Disable transmitter")
oce.add("4", "CCSDS Handler: Set arbitrary bitrate")
oce.add("5", "CCSDS Handler: Enable tx clock manipulator")
oce.add("6", "CCSDS Handler: Disable tx clock manipulator")
oce.add("7", "CCSDS Handler: Update tx data on rising edge")
oce.add("8", "CCSDS Handler: Update tx data on falling edge")
defs.add_service(CustomServiceList.CCSDS_HANDLER.value, "CCSDS Handler", oce)
def add_str_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_star_tracker = {
"0": (
"Star Tracker: Mode On, Submode Bootloader",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"1": ("Star Tracker: Mode On, Submode Firmware", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Star Tracker: Mode Normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Star Tracker: Mode Off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Star Tracker: Mode Raw", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Star Tracker: Ping", {OpCodeDictKeys.TIMEOUT: 5.0}),
"6": (
"Star Tracker: Switch to bootloader program",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"7": ("Star Tracker: Request temperature", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("Star Tracker: Request version", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("Star Tracker: Request interface", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("Star Tracker: Request power", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": (
"Star Tracker: Set subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"12": (
"Star Tracker: Boot image (requires bootloader mode)",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"13": ("Star Tracker: Request time", {OpCodeDictKeys.TIMEOUT: 2.0}),
"14": ("Star Tracker: Request solution", {OpCodeDictKeys.TIMEOUT: 2.0}),
"15": ("Star Tracker: Upload image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"16": ("Star Tracker: Download image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"17": ("Star Tracker: Set limit parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"18": ("Star Tracker: Set tracking parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"19": ("Star Tracker: Set mounting parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"20": ("Star Tracker: Set camera parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"22": (
"Star Tracker: Set centroiding parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"23": ("Star Tracker: Set LISA parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"24": ("Star Tracker: Set matching parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"25": (
"Star Tracker: Set validation parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"26": ("Star Tracker: Set algo parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"27": ("Star Tracker: Take image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"28": ("Star Tracker: Stop str helper", {OpCodeDictKeys.TIMEOUT: 2.0}),
"30": (
"Star Tracker: Set name of download image",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"31": ("Star Tracker: Request histogram", {OpCodeDictKeys.TIMEOUT: 2.0}),
"32": ("Star Tracker: Request contrast", {OpCodeDictKeys.TIMEOUT: 2.0}),
"33": ("Star Tracker: Set json filename", {OpCodeDictKeys.TIMEOUT: 2.0}),
"35": ("Star Tracker: Flash read", {OpCodeDictKeys.TIMEOUT: 2.0}),
"36": ("Star Tracker: Set flash read filename", {OpCodeDictKeys.TIMEOUT: 2.0}),
"37": ("Star Tracker: Get checksum", {OpCodeDictKeys.TIMEOUT: 2.0}),
"49": ("Star Tracker: Request camera parameter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"50": ("Star Tracker: Request limits", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": (
"Star Tracker: Set image processor parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"52": (
"Star Tracker: (EGSE only) Load camera ground config ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"53": (
"Star Tracker: (EGSE only) Load camera flight config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"54": (
"Star Tracker: Request log level parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"55": (
"Star Tracker: Request mounting parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"56": (
"Star Tracker: Request image processor parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"57": (
"Star Tracker: Request centroiding parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"58": ("Star Tracker: Request lisa parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"59": (
"Star Tracker: Request matching parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"60": (
"Star Tracker: Request tracking parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"61": (
"Star Tracker: Request validation parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"62": ("Star Tracker: Request algo parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"63": (
"Star Tracker: Request subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"64": (
"Star Tracker: Request log subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"65": (
"Star Tracker: Request debug camera parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"66": ("Star Tracker: Set log level parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"67": (
"Star Tracker: Set log subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"68": (
"Star Tracker: Set debug camera parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"69": ("Star Tracker: Firmware update", {OpCodeDictKeys.TIMEOUT: 2.0}),
"70": (
"Star Tracker: Disable timestamp generation",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"71": (
"Star Tracker: Enable timestamp generation",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker)
cmd_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple
def add_str_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "Star Tracker: Mode On, Submode Bootloader")
oce.add("1", "Star Tracker: Mode On, Submode Firmware")
oce.add("2", "Star Tracker: Mode Normal")
oce.add("3", "Star Tracker: Mode Off")
oce.add("4", "Star Tracker: Mode Raw")
oce.add("5", "Star Tracker: Ping")
oce.add("6", "Star Tracker: Switch to bootloader program")
oce.add("7", "Star Tracker: Request temperature")
oce.add("8", "Star Tracker: Request version")
oce.add("9", "Star Tracker: Request interface")
oce.add("10", "Star Tracker: Request power")
oce.add("11", "Star Tracker: Set subscription parameters")
oce.add("12", "Star Tracker: Boot image (requires bootloader mode)")
oce.add("13", "Star Tracker: Request time")
oce.add("14", "Star Tracker: Request solution")
oce.add("15", "Star Tracker: Upload image")
oce.add("16", "Star Tracker: Download image")
oce.add("17", "Star Tracker: Set limit parameters")
oce.add("17", "Star Tracker: Set limit parameters")
oce.add("18", "Star Tracker: Set tracking parameters")
oce.add("19", "Star Tracker: Set mounting parameters")
oce.add("20", "Star Tracker: Set camera parameters")
oce.add("22", "Star Tracker: Set centroiding parameters")
oce.add("23", "Star Tracker: Set LISA parameters")
oce.add("24", "Star Tracker: Set matching parameters")
oce.add("25", "Star Tracker: Set validation parameters")
oce.add("26", "Star Tracker: Set algo parameters")
oce.add("27", "Star Tracker: Take image")
oce.add("28", "Star Tracker: Stop str helper")
oce.add("30", "Star Tracker: Set name of download image")
oce.add("31", "Star Tracker: Request histogram")
oce.add("32", "Star Tracker: Request contrast")
oce.add("33", "Star Tracker: Set json filename")
oce.add("35", "Star Tracker: Flash read")
oce.add("36", "Star Tracker: Set flash read filename")
oce.add("37", "Star Tracker: Get checksum")
oce.add("49", "Star Tracker: Request camera parameter")
oce.add("50", "Star Tracker: Request limits")
oce.add("51", "Star Tracker: Set image processor parameters")
oce.add("52", "Star Tracker: (EGSE only) Load camera ground config")
oce.add("53", "Star Tracker: (EGSE only) Load camera flight config")
oce.add("54", "Star Tracker: Request log level parameters")
oce.add("55", "Star Tracker: Request mounting parameters")
oce.add("56", "Star Tracker: Request image processor parameters")
oce.add("57", "Star Tracker: Request centroiding parameters")
oce.add("58", "Star Tracker: Request lisa parameters")
oce.add("59", "Star Tracker: Request matching parameters")
oce.add("60", "Star Tracker: Request tracking parameters")
oce.add("61", "Star Tracker: Request validation parameters")
oce.add("62", "Star Tracker: Request algo parameters")
oce.add("63", "Star Tracker: Request subscription parameters")
oce.add("64", "Star Tracker: Request log subscription parameters")
oce.add("65", "Star Tracker: Request debug camera parameters")
oce.add("66", "Star Tracker: Set log level parameters")
oce.add("67", "Star Tracker: Set log subscription parameters")
oce.add("68", "Star Tracker: Set debug camera parameters")
oce.add("69", "Star Tracker: Firmware update")
oce.add("70", "Star Tracker: Disable timestamp generation")
oce.add("71", "Star Tracker: Enable timestamp generation")
defs.add_service(CustomServiceList.STAR_TRACKER.value, "Star Tracker", oce)
def add_syrlinks_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_syrlinks_handler = {
"0": ("Syrlinks Handler: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Syrlinks Handler: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Syrlinks Handler: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Syrlinks Handler: Set TX standby", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Syrlinks Handler: Set TX modulation", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Syrlinks Handler: Set TX carrier wave", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Syrlinks Handler: Read TX status", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("Syrlinks Handler: Read TX waveform", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": (
"Syrlinks Handler: Read TX AGC value high byte ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"9": (
"Syrlinks Handler: Read TX AGC value low byte ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"12": (
"Syrlinks Handler: Write LCL config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"13": (
"Syrlinks Handler: Read RX status registers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"14": (
"Syrlinks Handler: Read LCL config register",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"15": (
"Syrlinks Handler: Set waveform OQPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"16": (
"Syrlinks Handler: Set waveform BPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"17": (
"Syrlinks Handler: Set second config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"18": (
"Syrlinks Handler: Enable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"19": (
"Syrlinks Handler: Disable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_syrlinks_handler_tuple = (
"Syrlinks Handler",
op_code_dict_srv_syrlinks_handler,
def add_syrlinks_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "Syrlinks Handler: Set mode off")
oce.add("1", "Syrlinks Handler: Set mode on")
oce.add("2", "Syrlinks Handler: Set mode normal")
oce.add("3", "Syrlinks Handler: Set TX standby")
oce.add("4", "Syrlinks Handler: Set TX modulation")
oce.add("5", "Syrlinks Handler: Set TX carrier wave")
oce.add("6", "Syrlinks Handler: Read TX status")
oce.add("7", "Syrlinks Handler: Read TX waveform")
oce.add("8", "Syrlinks Handler: Read TX AGC value high byte")
oce.add("9", "Syrlinks Handler: Read TX AGC value low byte")
oce.add("12", "Syrlinks Handler: Write LCL config")
oce.add("13", "Syrlinks Handler: Read RX status registers")
oce.add("14", "Syrlinks Handler: Read LCL config register")
oce.add("15", "Syrlinks Handler: Set waveform OQPSK")
oce.add("16", "Syrlinks Handler: Set waveform BPSK")
oce.add("17", "Syrlinks Handler: Set second config")
oce.add("18", "Syrlinks Handler: Enable debug output")
oce.add("19", "Syrlinks Handler: Disable debug output")
defs.add_service(CustomServiceList.SYRLINKS.value, "Syrlinks Handler", oce)
def add_bpx_cmd_definitions(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=BpxOpCodes.HK, info="Request BPX HK")
oce.add(keys=BpxOpCodes.RST_BOOT_CNT, info="Reset Boot Count")
oce.add(keys=BpxOpCodes.REQUEST_CFG, info="Request Configuration Struct (Step 1)")
oce.add(
keys=BpxOpCodes.REQUEST_CFG_HK, info="Request Configuration Struct HK (Step 2)"
)
cmd_dict[CustomServiceList.SYRLINKS.value] = service_syrlinks_handler_tuple
def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=BpxOpCodes.HK, info="Request BPX HK"
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=BpxOpCodes.RST_BOOT_CNT, info="Reset Boot Count"
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=BpxOpCodes.REQUEST_CFG,
info="Request Configuration Struct (Step 1)",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=BpxOpCodes.REQUEST_CFG_HK,
info="Request Configuration Struct HK (Step 2)",
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=BpxOpCodes.REBOOT, info="Reboot Command"
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
oce.add(keys=BpxOpCodes.REBOOT, info="Reboot Command")
defs.add_service(
name=CustomServiceList.BPX_BATTERY.value,
info="BPX Battery Handler",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
def add_time_cmds(cmd_dict: ServiceOpCodeDictT):
def add_time_cmds(defs: TmTcDefWrapper):
from pus_tc.system.time import OpCodes, Info
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
oce = OpCodeEntry()
oce.add(
keys=OpCodes.SET_CURRENT_TIME,
info=Info.SET_CURRENT_TIME,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.TIME.value,
info="Time Service",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_imtq = {
"0": ("Mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("IMTQ perform pos X self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("IMTQ perform neg X self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("IMTQ perform pos Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("IMTQ perform neg Y self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("IMTQ perform pos Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("IMTQ perform neg Z self test", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("IMTQ command dipole", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("IMTQ get commanded dipole", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": ("IMTQ get engineering hk set", {OpCodeDictKeys.TIMEOUT: 2.0}),
"12": (
"IMTQ get calibrated MTM measurement one shot",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"13": ("IMTQ get raw MTM measurement one shot", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_imtq_tuple = ("IMTQ Device", op_code_dict_srv_imtq)
cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple
def add_imtq_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "Mode Off")
oce.add("1", "Mode On")
oce.add("2", "Mode Normal")
oce.add("3", "IMTQ perform pos X self test")
oce.add("4", "IMTQ perform neg X self test")
oce.add("5", "IMTQ perform pos Y self test")
oce.add("6", "IMTQ perform neg Y self test")
oce.add("7", "IMTQ perform pos Z self test")
oce.add("8", "IMTQ perform neg Z self test")
oce.add("9", "IMTQ command dipole")
oce.add("10", "IMTQ get commanded dipole")
oce.add("11", "IMTQ get engineering hk set")
oce.add("12", "IMTQ get calibrated MTM measurement one shot")
oce.add("13", "IMTQ get raw MTM measurement one shot")
defs.add_service(CustomServiceList.IMTQ.value, "IMQT Device", oce)
def add_ploc_mpsoc_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_ploc_mpsoc = {
"0": ("Ploc MPSoC: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Ploc MPSoC: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Ploc MPSoC: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Ploc MPSoC: Memory write", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Ploc MPSoC: Memory read", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Ploc MPSoC: Flash write", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Ploc MPSoC: Flash delete", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("Ploc MPSoC: Replay start", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("Ploc MPSoC: Replay stop", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("Ploc MPSoC: Downlink pwr on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("Ploc MPSoC: Downlink pwr off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": ("Ploc MPSoC: Replay write sequence", {OpCodeDictKeys.TIMEOUT: 2.0}),
"12": ("Ploc MPSoC: OBSW reset sequence count", {OpCodeDictKeys.TIMEOUT: 2.0}),
"13": ("Ploc MPSoC: Read DEADBEEF address", {OpCodeDictKeys.TIMEOUT: 2.0}),
"14": ("Ploc MPSoC: Mode replay", {OpCodeDictKeys.TIMEOUT: 2.0}),
"15": ("Ploc MPSoC: Mode idle", {OpCodeDictKeys.TIMEOUT: 2.0}),
"16": ("Ploc MPSoC: Tc cam command send", {OpCodeDictKeys.TIMEOUT: 2.0}),
"17": ("Ploc MPSoC: Set UART TX tristate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"18": ("Ploc MPSoC: Relesase UART TX", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_mpsoc_tuple = ("Ploc MPSoC", op_code_dict_srv_ploc_mpsoc)
cmd_dict[CustomServiceList.PLOC_MPSOC.value] = service_ploc_mpsoc_tuple
def add_ploc_mpsoc_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "Ploc MPSoC: Set mode off")
oce.add("1", "Ploc MPSoC: Set mode on")
oce.add("2", "Ploc MPSoC: Set mode normal")
oce.add("3", "Ploc MPSoC: Memory write")
oce.add("4", "Ploc MPSoC: Memory read")
oce.add("5", "Ploc MPSoC: Flash write")
oce.add("6", "Ploc MPSoC: Flash delete")
oce.add("7", "Ploc MPSoC: Replay start")
oce.add("8", "Ploc MPSoC: Replay stop")
oce.add("9", "Ploc MPSoC: Downlink pwr on")
oce.add("10", "Ploc MPSoC: Downlink pwr off")
oce.add("11", "Ploc MPSoC: Replay write sequence")
oce.add("12", "Ploc MPSoC: OBSW reset sequence count")
oce.add("13", "Ploc MPSoC: Read DEADBEEF address")
oce.add("14", "Ploc MPSoC: Mode replay")
oce.add("15", "Ploc MPSoC: Mode idle")
oce.add("16", "Ploc MPSoC: Tc cam command send")
oce.add("17", "Ploc MPSoC: Set UART TX tristate")
oce.add("18", "Ploc MPSoC: Relesase UART TX")
defs.add_service(CustomServiceList.PLOC_MPSOC.value, "Ploc MPSoC", oce)
def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_ploc_mem_dumper = {
"0": ("PLOC Memory Dumper: MRAM dump", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_memory_dumper_tuple = (
"PLOC Memory Dumper",
op_code_dict_ploc_mem_dumper,
def add_ploc_supv_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add("0", "PLOC Memory Dumper: MRAM dump")
defs.add_service(
CustomServiceList.PLOC_MEMORY_DUMPER.value, "PLOC Memory Dumper", oce
)
op_code_dict_srv_ploc_supv = {
"0": ("PLOC Supervisor: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PLOC Supervisor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("PLOC Supervisor: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("PLOC Supervisor: Get HK Report", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("PLOC Supervisor: Start MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("PLOC Supervisor: Shutdown MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": (
"PLOC Supervisor: Select MPSoC boot image",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"8": ("PLOC Supervisor: Set max restart tries", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("PLOC Supervisor: Reset MPSoC", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("PLOC Supervisor: Set time reference", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": ("PLOC Supervisor: Set boot timeout", {OpCodeDictKeys.TIMEOUT: 2.0}),
"12": ("PLOC Supervisor: Disable Hk", {OpCodeDictKeys.TIMEOUT: 2.0}),
"13": (
"PLOC Supervisor: Request boot status report",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"17": ("PLOC Supervisor: Enable latchup alert", {OpCodeDictKeys.TIMEOUT: 2.0}),
"18": ("PLOC Supervisor: Disable latchup alert", {OpCodeDictKeys.TIMEOUT: 2.0}),
"20": ("PLOC Supervisor: Set alert limit", {OpCodeDictKeys.TIMEOUT: 2.0}),
"23": (
"PLOC Supervisor: Set ADC enabled channels",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"24": (
"PLOC Supervisor: Set ADC window and stride",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"25": ("PLOC Supervisor: Set ADC threshold", {OpCodeDictKeys.TIMEOUT: 2.0}),
"26": (
"PLOC Supervisor: Request latchup status report",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"27": ("PLOC Supervisor: Copy ADC data to MRAM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"30": ("PLOC Supervisor: Run auto EM tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"31": ("PLOC Supervisor: MRAM Wipe", {OpCodeDictKeys.TIMEOUT: 2.0}),
"35": ("PLOC Supervisor: Set GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
"36": ("PLOC Supervisor: Read GPIO", {OpCodeDictKeys.TIMEOUT: 2.0}),
"37": ("PLOC Supervisor: Restart supervisor", {OpCodeDictKeys.TIMEOUT: 2.0}),
"38": (
"PLOC Supervisor: Factory reset clear all",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"39": (
"PLOC Supervisor: Factory reset clear mirror entries",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"40": (
"PLOC Supervisor: Factory reset clear circular entries",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"42": ("PLOC Supervisor: Perform update", {OpCodeDictKeys.TIMEOUT: 2.0}),
"43": (
"PLOC Supervisor: Terminate supervisor process",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"44": ("PLOC Supervisor: Start MPSoC quiet", {OpCodeDictKeys.TIMEOUT: 2.0}),
"45": ("PLOC Supervisor: Set shutdown timeout", {OpCodeDictKeys.TIMEOUT: 2.0}),
"46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}),
"47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": (
"PLOC Supervisor: Logging request event buffers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"52": (
"PLOC Supervisor: Logging clear counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}),
"54": (
"PLOC Supervisor: Logging request counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"55": ("PLOC Supervisor: Request ADC Report", {OpCodeDictKeys.TIMEOUT: 2.0}),
"56": ("PLOC Supervisor: Reset PL", {OpCodeDictKeys.TIMEOUT: 2.0}),
"57": ("PLOC Supervisor: Enable NVMs", {OpCodeDictKeys.TIMEOUT: 2.0}),
"58": ("PLOC Supervisor: Continue update", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_ploc_supv_tuple = ("PLOC Supervisor", op_code_dict_srv_ploc_supv)
cmd_dict[CustomServiceList.PLOC_SUPV.value] = service_ploc_supv_tuple
cmd_dict[
CustomServiceList.PLOC_MEMORY_DUMPER.value
] = service_ploc_memory_dumper_tuple
oce = OpCodeEntry()
oce.add("1", "PLOC Supervisor: Set mode off")
oce.add("2", "PLOC Supervisor: Set mode normal")
oce.add("3", "PLOC Supervisor: Get HK Report")
oce.add("5", "PLOC Supervisor: Start MPSoC")
oce.add("6", "PLOC Supervisor: Shutdown MPSoC")
oce.add("7", "PLOC Supervisor: Select MPSoC boot image")
oce.add("8", "PLOC Supervisor: Set max restart tries")
oce.add("9", "PLOC Supervisor: Reset MPSoC")
oce.add("10", "PLOC Supervisor: Set time reference")
oce.add("11", "PLOC Supervisor: Set boot timeout")
oce.add("12", "PLOC Supervisor: Disable Hk")
oce.add("13", "PLOC Supervisor: Request boot status report")
oce.add("17", "PLOC Supervisor: Enable latchup alert")
oce.add("18", "PLOC Supervisor: Disable latchup alert")
oce.add("20", "PLOC Supervisor: Set alert limit")
oce.add("23", "PLOC Supervisor: Set ADC enabled channels")
oce.add("24", "PLOC Supervisor: Set ADC window and stride")
oce.add("25", "PLOC Supervisor: Set ADC threshold")
oce.add("26", "PLOC Supervisor: Request latchup status report")
oce.add("27", "PLOC Supervisor: Copy ADC data to MRAM")
oce.add("30", "PLOC Supervisor: Run auto EM tests")
oce.add("31", "PLOC Supervisor: MRAM Wipe")
oce.add("35", "PLOC Supervisor: Set GPIO")
oce.add("36", "PLOC Supervisor: Read GPIO")
oce.add("37", "PLOC Supervisor: Restart supervisor")
oce.add("38", "PLOC Supervisor: Factory reset clear all")
oce.add("39", "PLOC Supervisor: Factory reset clear mirror entries")
oce.add("40", "PLOC Supervisor: Factory reset clear circular entries")
oce.add("42", "PLOC Supervisor: Perform update")
oce.add("43", "PLOC Supervisor: Terminate supervisor process")
oce.add("44", "PLOC Supervisor: Start MPSoC quiet")
oce.add("45", "PLOC Supervisor: Set shutdown timeout")
oce.add("46", "PLOC Supervisor: Factory flash")
oce.add("47", "PLOC Supervisor: Enable auto TM")
oce.add("48", "PLOC Supervisor: Disable auto TM")
oce.add("51", "PLOC Supervisor: Logging request event buffers")
oce.add("52", "PLOC Supervisor: Logging clear counters")
oce.add("53", "PLOC Supervisor: Logging set topic")
oce.add("54", "PLOC Supervisor: Logging request counters")
oce.add("55", "PLOC Supervisor: Request ADC Report")
oce.add("56", "PLOC Supervisor: Reset PL")
oce.add("57", "PLOC Supervisor: Enable NVMs")
oce.add("58", "PLOC Supervisor: Continue update")
defs.add_service(CustomServiceList.PLOC_SUPV.value, "PLOC Supervisor", oce)
def add_system_cmds(cmd_dict: ServiceOpCodeDictT):
def add_system_cmds(defs: TmTcDefWrapper):
from pus_tc.system.acs import AcsOpCodes, SusOpCodes
import pus_tc.system.tcs as tcs
import pus_tc.system.controllers as controllers
default_opts = generate_op_code_options(
enter_listener_mode=False, custom_timeout=8.0
)
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
oce = OpCodeEntry()
oce.add(
keys=AcsOpCodes.ACS_ASS_A_SIDE,
info="Switch to ACS board A side",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=AcsOpCodes.ACS_ASS_B_SIDE,
info="Switch to ACS board B side",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=AcsOpCodes.ACS_ASS_DUAL_MODE,
info="Switch to ACS board dual mode",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=AcsOpCodes.ACS_ASS_A_ON,
info="Switch ACS board A side on",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=AcsOpCodes.ACS_ASS_B_ON,
info="Switch ACS board B side on",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=AcsOpCodes.ACS_ASS_DUAL_ON,
info="Switch ACS board dual mode on",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=AcsOpCodes.ACS_ASS_OFF,
info="Switch off ACS board",
options=default_opts,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.ACS_ASS.value,
info="ACS Assemblies",
op_code_entry=op_code_dict,
defs.add_service(
name=CustomServiceList.ACS_ASS.value, info="ACS Assemblies", op_code_entry=oce
)
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
oce = OpCodeEntry()
oce.add(
keys=SusOpCodes.SUS_ASS_NOM_SIDE,
info="Switch SUS board to nominal side",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=SusOpCodes.SUS_ASS_RED_SIDE,
info="Switch SUS board to redundant side",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=SusOpCodes.SUS_ASS_OFF,
info="Switch off SUS board",
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=SusOpCodes.SUS_ASS_DUAL_MODE,
info="Switch SUS board to dual mode",
options=default_opts,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.SUS_ASS.value,
info="SUS Assembly",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
oce = OpCodeEntry()
oce.add(
keys=tcs.OpCodes.TCS_BOARD_ASS_NORMAL,
info=tcs.Info.TCS_BOARD_ASS_NORMAL,
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=tcs.OpCodes.TCS_BOARD_ASS_OFF,
info=tcs.Info.TCS_BOARD_ASS_OFF,
options=default_opts,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.TCS_ASS.value,
info="TCS Board Assembly",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
oce = OpCodeEntry()
oce.add(
keys=controllers.OpCodes.THERMAL_CONTROLLER,
info=controllers.Info.THERMAL_CONTROLLER,
options=default_opts,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=controllers.OpCodes.CORE_CONTROLLER,
info=controllers.Info.CORE_CONTROLLER,
options=default_opts,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.CONTROLLERS.value,
info="Controllers",
op_code_entry=op_code_dict,
op_code_entry=oce,
)

View File

@ -6,6 +6,7 @@
import struct
from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
@ -19,7 +20,7 @@ from gomspace.gomspace_common import Info as GsInfo
from config.object_ids import ACU_HANDLER_ID
from pus_tc.devs.p60dock import P60DockConfigTable
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class ACUConfigTable:
@ -47,43 +48,37 @@ class Info:
TEST = "ACU Test"
def add_acu_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
def add_acu_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.GET_PARAM,
info=GsInfo.GET_PARAMETER,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.SET_PARAM,
info=GsInfo.SET_PARAMETER,
)
add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.TEST, info=Info.TEST)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
op_code_entry=op_code_dict,
oce.add(keys=OpCodes.TEST, info=Info.TEST)
defs.add_service(
name=CustomServiceList.ACU.value,
info="ACU Device",
op_code_entry=oce,
)
def pack_acu_commands(object_id: ObjectId, q: QueueHelper, op_code: str) -> TcQueueT:
def pack_acu_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd("Handling ACU command")
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd("ACU: Print channel stats")
@ -124,8 +119,6 @@ def pack_acu_commands(object_id: ObjectId, q: QueueHelper, op_code: str) -> TcQu
)
pack_test_cmds(object_id=object_id, q=q)
return q
class ACUTestProcedure:
"""
@ -150,7 +143,7 @@ class ACUTestProcedure:
off = False
def pack_test_cmds(object_id: ObjectId, q: QueueHelper):
def pack_test_cmds(object_id: ObjectIdU32, q: QueueHelper):
if ACUTestProcedure.all or ACUTestProcedure.reboot:
q.add_log_cmd("ACU: Reboot")
q.add_pus_tc(gs.pack_reboot_command(object_id))

View File

@ -9,7 +9,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class CommandIds:
@ -32,7 +32,7 @@ class CommandIds:
UPDATE_ON_FALLING_EDGE = 8
def pack_ccsds_handler_test(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_ccsds_handler_test(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
obyt = object_id.as_bytes
q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}")
if op_code == "0":

View File

@ -1,6 +1,7 @@
import enum
from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.logging import get_console_logger
@ -23,19 +24,14 @@ class SetIds:
HK = 0
def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
op_code_entry=op_code_dict,
def add_gps_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.RESET_GNSS, info=Info.RESET_GNSS)
oce.add(keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK)
defs.add_service(
name=CustomServiceList.GPS_CTRL.value,
info="GPS/GNSS Controller",
op_code_entry=oce,
)

View File

@ -7,8 +7,9 @@ import enum
from config.definitions import CustomServiceList
from config.object_ids import get_object_ids
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.utility.obj_id import ObjectId
from tmtccmd.utility.obj_id import ObjectIdU32
from tmtccmd.tc.pus_201_fsfw_health import (
pack_set_health_cmd_data,
FsfwHealth,
@ -52,31 +53,16 @@ class ActionIds(enum.IntEnum):
SWITCH_HEATER = 0
def add_heater_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.HEATER_CMD, info=Info.HEATER_CMD
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_HEALTHY_CMD,
info=Info.HEATER_HEALTHY_CMD,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_EXT_CTRL,
info=Info.HEATER_EXT_CTRL,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_FAULTY_CMD,
info=Info.HEATER_FAULTY_CMD,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
def add_heater_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.HEATER_CMD, info=Info.HEATER_CMD)
oce.add(keys=OpCodes.HEATER_HEALTHY_CMD, info=Info.HEATER_HEALTHY_CMD)
oce.add(keys=OpCodes.HEATER_EXT_CTRL, info=Info.HEATER_EXT_CTRL)
oce.add(keys=OpCodes.HEATER_FAULTY_CMD, info=Info.HEATER_FAULTY_CMD)
defs.add_service(
name=CustomServiceList.HEATER.value,
info="Heater Device",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
@ -133,7 +119,7 @@ def pack_heater_cmds(object_id: bytearray, op_code: str, q: QueueHelper):
)
def heater_idx_to_obj(heater: int) -> ObjectId:
def heater_idx_to_obj(heater: int) -> ObjectIdU32:
from config.object_ids import (
HEATER_0_OBC_BRD,
HEATER_1_PLOC_PROC_BRD,
@ -158,7 +144,7 @@ def heater_idx_to_obj(heater: int) -> ObjectId:
obj_dict = get_object_ids()
obj_id_obj = obj_dict.get(obj_id_array[heater])
if obj_id_obj is None:
return ObjectId.from_bytes(obj_id_array[heater])
return ObjectIdU32.from_bytes(obj_id_array[heater])
return obj_id_obj
@ -187,7 +173,7 @@ def prompt_heater() -> int:
def health_cmd(
q: QueueHelper,
heater_idx: int,
object_id: ObjectId,
object_id: ObjectIdU32,
health: FsfwHealth,
health_str: str,
):

View File

@ -15,7 +15,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
)
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class ImtqSetIds:
@ -44,7 +44,7 @@ class ImtqActionIds:
read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D])
def pack_imtq_test_into(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_imtq_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}"
)

View File

@ -5,15 +5,8 @@
@author J. Meier
@date 13.12.2020
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER
@ -89,7 +82,7 @@ class P60DockHkTable:
wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size)
def pack_p60dock_cmds(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_p60dock_cmds(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
objb = object_id.as_bytes
if op_code in P60OpCodes.STACK_3V3_ON:
q.add_log_cmd(Info.STACK_3V3_ON)

View File

@ -1,10 +1,5 @@
from config.definitions import CustomServiceList
from tmtccmd.config import (
ServiceOpCodeDictT,
add_op_code_entry,
add_service_op_code_entry,
OpCodeDictKeys,
)
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from pus_tc.devs.p60dock import P60OpCodes, GomspaceOpCodes, Info
from pus_tc.devs.pdu1 import Pdu1OpCodes
@ -13,288 +8,137 @@ from pus_tc.devs.acu import add_acu_cmds
from gomspace.gomspace_common import Info as GsInfo
def add_p60_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_3V3_ON,
info=Info.STACK_3V3_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_3V3_OFF,
info=Info.STACK_3V3_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_5V_ON,
info=Info.STACK_5V_ON,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=P60OpCodes.STACK_5V_OFF,
info=Info.STACK_5V_OFF,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
def add_p60_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=P60OpCodes.STACK_3V3_ON, info=Info.STACK_3V3_ON)
oce.add(keys=P60OpCodes.STACK_3V3_OFF, info=Info.STACK_3V3_OFF)
oce.add(keys=P60OpCodes.STACK_5V_ON, info=Info.STACK_5V_ON)
oce.add(keys=P60OpCodes.STACK_5V_OFF, info=Info.STACK_5V_OFF)
oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE)
oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE)
oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="P60 Dock: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="P60 Dock: Print Latchups",
)
add_op_code_entry(op_code_dict=op_code_dict, keys=P60OpCodes.TEST, info="P60 Tests")
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.P60DOCK.value,
info="P60 Device",
op_code_entry=op_code_dict,
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info="P60 Dock: Print Latchups")
oce.add(keys=P60OpCodes.TEST, info="P60 Tests")
defs.add_service(
name=CustomServiceList.P60DOCK.value, info="P60 Device", op_code_entry=oce
)
def add_pdu1_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.TCS_BOARD_OFF.value,
info="PDU1: Turn TCS board off",
def add_pdu1_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=Pdu1OpCodes.TCS_BOARD_OFF.value, info="PDU1: Turn TCS board off")
oce.add(keys=Pdu1OpCodes.STAR_TRACKER_ON.value, info="PDU1: Turn star tracker on")
oce.add(keys=Pdu1OpCodes.STAR_TRACKER_OFF.value, info="PDU1: Turn star tracker off")
oce.add(keys=Pdu1OpCodes.SUS_NOMINAL_ON.value, info="PDU1: Turn SUS nominal on")
oce.add(keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value, info="PDU1: Turn SUS nominal off")
oce.add(keys=Pdu1OpCodes.ACS_A_SIDE_ON.value, info="PDU1: Turn ACS A side on")
oce.add(keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value, info="PDU1: Turn ACS A side off")
oce.add(keys=Pdu1OpCodes.SYRLINKS_ON.value, info="PDU1: Turn Syrlinks on")
oce.add(keys=Pdu1OpCodes.SYRLINKS_OFF.value, info="PDU1: Turn Syrlinks off")
oce.add(keys=Pdu1OpCodes.MGT_ON.value, info="PDU1: Turn MGT on")
oce.add(keys=Pdu1OpCodes.MGT_OFF.value, info="PDU1: Turn MGT off")
oce.add(keys=Pdu1OpCodes.PLOC_ON.value, info="PDU1: Turn PLOC on")
oce.add(keys=Pdu1OpCodes.PLOC_OFF.value, info="PDU1: Turn PLOC off")
oce.add(keys=Pdu1OpCodes.SCEX_ON.value, info="PDU1: Turn Solar Cell Experiment on")
oce.add(
keys=Pdu1OpCodes.SCEX_OFF.value, info="PDU1: Turn Solar Cell Experiment off"
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.STAR_TRACKER_ON.value,
info="PDU1: Turn star tracker on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.STAR_TRACKER_OFF.value,
info="PDU1: Turn star tracker off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.SUS_NOMINAL_ON.value,
info="PDU1: Turn SUS nominal on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.SUS_NOMINAL_OFF.value,
info="PDU1: Turn SUS nominal off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.ACS_A_SIDE_ON.value,
info="PDU1: Turn ACS A side on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.ACS_A_SIDE_OFF.value,
info="PDU1: Turn ACS A side off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.SYRLINKS_ON.value,
info="PDU1: Turn Syrlinks on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.SYRLINKS_OFF.value,
info="PDU1: Turn Syrlinks off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.MGT_ON.value,
info="PDU1: Turn MGT on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.MGT_OFF.value,
info="PDU1: Turn MGT off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.PLOC_ON.value,
info="PDU1: Turn PLOC on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.PLOC_OFF.value,
info="PDU1: Turn PLOC off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.SCEX_ON.value,
info="PDU1: Turn Solar Cell Experiment on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.SCEX_OFF.value,
info="PDU1: Turn Solar Cell Experiment off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE, info=GsInfo.REQUEST_CORE_HK_ONCE)
oce.add(keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE, info=GsInfo.REQUEST_AUX_HK_ONCE)
oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU1: Print Switches, Voltages, Currents",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu1OpCodes.TCS_BOARD_ON.value,
info="PDU1: Turn TCS board on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU1: Print Latchups",
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests"
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=GomspaceOpCodes.SET_PARAM, info="Set parameter"
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
oce.add(keys=Pdu1OpCodes.TCS_BOARD_ON.value, info="PDU1: Turn TCS board on")
oce.add(keys=GomspaceOpCodes.PRINT_LATCHUPS, info="PDU1: Print Latchups")
oce.add(keys=Pdu1OpCodes.TESTS.value, info="PDU1 Tests")
oce.add(keys=GomspaceOpCodes.SET_PARAM, info="Set parameter")
defs.add_service(
name=CustomServiceList.PDU1.value,
info="PDU1 Device",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
def add_pdu2_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, keys="0", info="PDU2 Tests")
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu2OpCodes.ACS_SIDE_B_ON.value,
info="PDU2: Turn ACS Side B on",
def add_pdu2_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys="0", info="PDU2 Tests")
oce.add(keys=Pdu2OpCodes.ACS_SIDE_B_ON.value, info="PDU2: Turn ACS Side B on")
oce.add(keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value, info="PDU2: Turn ACS Side B off")
oce.add(keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value, info="PDU2: Turn SUS redundant on")
oce.add(
keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value, info="PDU2: Turn SUS redundant off"
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu2OpCodes.ACS_SIDE_B_OFF.value,
info="PDU2: Turn ACS Side B off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu2OpCodes.SUS_REDUNDANT_ON.value,
info="PDU2: Turn SUS redundant on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu2OpCodes.SUS_REDUNDANT_OFF.value,
info="PDU2: Turn SUS redundant off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu2OpCodes.RW_ON.value,
info="PDU2: Turn reaction wheels on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=Pdu2OpCodes.RW_OFF.value,
info="PDU2: Turn reaction wheels off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(keys=Pdu2OpCodes.RW_ON.value, info="PDU2: Turn reaction wheels on")
oce.add(keys=Pdu2OpCodes.RW_OFF.value, info="PDU2: Turn reaction wheels off")
oce.add(
keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_ON.value,
info="PDU2: PL PCDU Switch Channel Nominal (1) on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.PL_PCDU_VBAT_NOM_OFF.value,
info="PDU2: PL PCDU Switch Channel Nominal (1) off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_ON.value,
info="PDU2: PL PCDU Switch Channel Redundant (1) on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.PL_PCDU_VBAT_RED_OFF.value,
info="PDU2: PL PCDU Switch Channel Redundant (1) off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.TCS_HEATER_IN_ON.value,
info="PDU2: Switch TCS Heater Input on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.TCS_HEATER_IN_OFF.value,
info="PDU2: Switch TCS Heater Input off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_ON.value,
info="PDU2: Switch Solar Array Deployment On",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.SOLAR_ARRAY_DEPL_OFF.value,
info="PDU2: Switch Solar Array Deployment Off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.PL_CAMERA_ON.value,
info="PDU2: Turn payload camera on",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=Pdu2OpCodes.PL_CAMERA_OFF.value,
info="PDU2: Turn payload camera off",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.REQUEST_CORE_HK_ONCE,
info=GsInfo.REQUEST_CORE_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.REQUEST_AUX_HK_ONCE,
info=GsInfo.REQUEST_AUX_HK_ONCE,
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.PRINT_SWITCH_V_I,
info="PDU2: Print Switches, Voltages, Currents",
options={OpCodeDictKeys.TIMEOUT: 2.0},
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=GomspaceOpCodes.PRINT_LATCHUPS,
info="PDU2: Print Latchups",
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name="pdu2",
info="PDU2 Device",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
add_p60_cmds(cmd_dict)
add_pdu1_cmds(cmd_dict)
add_pdu2_cmds(cmd_dict)
add_acu_cmds(cmd_dict)
def add_pcdu_cmds(defs: TmTcDefWrapper):
add_p60_cmds(defs)
add_pdu1_cmds(defs)
add_pdu2_cmds(defs)
add_acu_cmds(defs)

View File

@ -55,7 +55,7 @@ class PDU1TestProcedure:
turn_channel_3_off = False
def pack_pdu1_commands(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_pdu1_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd("Commanding PDU1")
objb = object_id.as_bytes
if op_code == Pdu1OpCodes.TCS_BOARD_ON.value:

View File

@ -65,7 +65,7 @@ class PDU2TestProcedure:
request_hk_table = False
def pack_pdu2_commands(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_pdu2_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd("Testing PDU2")
objb = object_id.as_bytes
if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value:

View File

@ -10,14 +10,14 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class ActionIds:
DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_ploc_memory_dumper_cmd(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}"
)

View File

@ -9,12 +9,10 @@
import struct
import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from utility.input_helper import InputHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
@ -68,9 +66,7 @@ class PlocReplyIds(enum.IntEnum):
TM_CAM_CMD_RPT = 19
def pack_ploc_mpsoc_commands(
object_id: ObjectId, q: QueueHelper, op_code: str
) -> TcQueueT:
def pack_ploc_mpsoc_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
)

View File

@ -12,7 +12,7 @@ from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from utility.input_helper import InputHelper
LOGGER = get_console_logger()
@ -106,9 +106,7 @@ class SupvHkIds:
BOOT_STATUS_REPORT = 53
def pack_ploc_supv_commands(
object_id: ObjectId, q: QueueHelper, op_code: str
) -> TcQueueT:
def pack_ploc_supv_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}")
obyt = object_id.as_bytes
if op_code == "0":

View File

@ -4,17 +4,11 @@ import time
from typing import Optional
from config.definitions import CustomServiceList
from tmtccmd.config import (
QueueCommands,
ServiceOpCodeDictT,
add_op_code_entry,
add_service_op_code_entry,
)
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config import TmTcDefWrapper
from tmtccmd.config.tmtc import OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,
)
@ -120,83 +114,39 @@ class ParamIds(enum.IntEnum):
INJECT_ALL_ON_FAILURE = 35
def add_pl_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_SSR,
info=Info.NORMAL_SSR,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_DRO,
info=Info.NORMAL_DRO,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_X8,
info=Info.NORMAL_X8,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_TX,
info=Info.NORMAL_TX,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_MPA,
info=Info.NORMAL_MPA,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.NORMAL_HPA,
info=Info.NORMAL_HPA,
)
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK
)
add_op_code_entry(
op_code_dict=op_code_dict,
def add_pl_pcdu_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.SWITCH_ON, info=Info.SWITCH_ON)
oce.add(keys=OpCodes.SWITCH_OFF, info=Info.SWITCH_OFF)
oce.add(keys=OpCodes.NORMAL_SSR, info=Info.NORMAL_SSR)
oce.add(keys=OpCodes.NORMAL_DRO, info=Info.NORMAL_DRO)
oce.add(keys=OpCodes.NORMAL_X8, info=Info.NORMAL_X8)
oce.add(keys=OpCodes.NORMAL_TX, info=Info.NORMAL_TX)
oce.add(keys=OpCodes.NORMAL_MPA, info=Info.NORMAL_MPA)
oce.add(keys=OpCodes.NORMAL_HPA, info=Info.NORMAL_HPA)
oce.add(keys=OpCodes.REQ_OS_HK, info=Info.REQ_OS_HK)
oce.add(
keys=OpCodes.INJECT_SSR_TO_DRO_FAILURE,
info="Inject failure SSR to DRO transition",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=OpCodes.INJECT_DRO_TO_X8_FAILURE,
info="Inject failure in DRO to X8 transition",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=OpCodes.INJECT_X8_TO_TX_FAILURE,
info="Inject failure in X8 to TX transition",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=OpCodes.INJECT_TX_TO_MPA_FAILURE,
info="Inject failure in TX to MPA transition",
)
add_op_code_entry(
op_code_dict=op_code_dict,
oce.add(
keys=OpCodes.INJECT_MPA_TO_HPA_FAILURE,
info="Inject failure in MPA to HPA transition",
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.INJECT_ALL_ON_FAILURE,
info="Inject failure in all on mode",
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.PL_PCDU.value,
info="PL PCDU",
op_code_entry=op_code_dict,
)
oce.add(keys=OpCodes.INJECT_ALL_ON_FAILURE, info="Inject failure in all on mode")
defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce)
def pack_pl_pcdu_commands(q: QueueHelper, op_code: str):
@ -439,11 +389,9 @@ def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int:
)
def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str):
def pack_wait_time_cmd(q: QueueHelper, param_id: int, print_str: str):
wait_time = request_wait_time()
tc_queue.appendleft(
(QueueCommands.PRINT, f"Updating {print_str} wait time to {wait_time}")
)
q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}")
if wait_time is None:
return
param_data = pack_scalar_double_param_app_data(
@ -452,8 +400,7 @@ def pack_wait_time_cmd(tc_queue: TcQueueT, param_id: int, print_str: str):
unique_id=param_id,
parameter=wait_time,
)
cmd = pack_fsfw_load_param_cmd(app_data=param_data)
tc_queue.appendleft(cmd.pack_command_tuple())
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data))
def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str):

View File

@ -11,9 +11,10 @@ from config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class SetIds:
@ -45,30 +46,22 @@ class CommandIds:
DISABLE_DEBUG_OUTPUT = 5
def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, info=Info.ON, keys=OpCodes.ON)
add_op_code_entry(op_code_dict=op_code_dict, info=Info.OFF, keys=OpCodes.OFF)
add_op_code_entry(op_code_dict=op_code_dict, info=Info.NORMAL, keys=OpCodes.NORMAL)
add_op_code_entry(
op_code_dict=op_code_dict, info=Info.REQ_OS_HK, keys=OpCodes.REQ_HK_ONCE
)
add_op_code_entry(
op_code_dict=op_code_dict, info=Info.DEBUG_ON, keys=OpCodes.DEBUG_ON
)
add_op_code_entry(
op_code_dict=op_code_dict, info=Info.DEBUG_OFF, keys=OpCodes.DEBUG_OFF
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
def add_rad_sens_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(info=Info.ON, keys=OpCodes.ON)
oce.add(info=Info.OFF, keys=OpCodes.OFF)
oce.add(info=Info.NORMAL, keys=OpCodes.NORMAL)
oce.add(info=Info.REQ_OS_HK, keys=OpCodes.REQ_HK_ONCE)
oce.add(info=Info.DEBUG_ON, keys=OpCodes.DEBUG_ON)
oce.add(info=Info.DEBUG_OFF, keys=OpCodes.DEBUG_OFF)
defs.add_service(
name=CustomServiceList.RAD_SENSOR.value,
info="Radiation Sensor",
op_code_entry=op_code_dict,
op_code_entry=oce,
)
def pack_rad_sensor_test_into(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_rad_sensor_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}")
if op_code in OpCodes.ON:
@ -94,7 +87,7 @@ def pack_rad_sensor_test_into(object_id: ObjectId, q: QueueHelper, op_code: str)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def rad_sensor_mode_cmd(object_id: ObjectId, mode: Modes, info: str, q: QueueHelper):
def rad_sensor_mode_cmd(object_id: ObjectIdU32, mode: Modes, info: str, q: QueueHelper):
q.add_log_cmd(f"Rad sensor: {info}")
mode_data = pack_mode_data(object_id.as_bytes, mode, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -6,6 +6,7 @@
"""
import struct
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
@ -74,57 +75,42 @@ class RampTime:
MS_1000 = 1000
def add_rw_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED
)
add_op_code_entry(op_code_dict=op_code_dict, info=InfoDevs.ON, keys=OpCodesDevs.ON)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.OFF, keys=OpCodesDevs.OFF
)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.NML, keys=OpCodesDevs.NML
)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS
)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
def add_rw_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED)
oce.add(info=InfoDevs.ON, keys=OpCodesDevs.ON)
oce.add(info=InfoDevs.OFF, keys=OpCodesDevs.OFF)
oce.add(info=InfoDevs.NML, keys=OpCodesDevs.NML)
oce.add(info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS)
oce.add(info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM)
defs.add_service(
name=CustomServiceList.REACTION_WHEEL_1.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 1",
op_code_entry=oce,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.REACTION_WHEEL_2.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 2",
op_code_entry=oce,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.REACTION_WHEEL_3.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 3",
op_code_entry=oce,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
defs.add_service(
name=CustomServiceList.REACTION_WHEEL_4.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 4",
op_code_entry=oce,
)
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.ON, keys=OpCodesAss.ON)
add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.NML, keys=OpCodesAss.NML)
add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.OFF, keys=OpCodesAss.OFF)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
oce = OpCodeEntry()
oce.add(info=InfoAss.ON, keys=OpCodesAss.ON)
oce.add(info=InfoAss.NML, keys=OpCodesAss.NML)
oce.add(info=InfoAss.OFF, keys=OpCodesAss.OFF)
defs.add_service(
name=CustomServiceList.RW_ASSEMBLY.value,
op_code_entry=op_code_dict,
info="Reaction Wheel Assembly",
op_code_entry=oce,
)

View File

@ -3,9 +3,10 @@ from typing import Optional
from config.definitions import CustomServiceList
from pus_tc.devs.pdec_handler import CommandIds
from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices
import config.object_ids as oids
from config.object_ids import get_object_ids
@ -44,20 +45,17 @@ class Info:
WIRTE_CONFIG = "Write config"
def specify_rtd_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.ON, info=Info.ON)
add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.NORMAL, info=Info.NORMAL)
add_op_code_entry(op_code_dict=op_code_dict, keys=OpCodes.OFF, info=Info.OFF)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
op_code_entry=op_code_dict,
name=CustomServiceList.RTD.value,
info="RTD commands",
def specify_rtd_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.ON, info=Info.ON)
oce.add(keys=OpCodes.NORMAL, info=Info.NORMAL)
oce.add(keys=OpCodes.OFF, info=Info.OFF)
defs.add_service(
name=CustomServiceList.RTD.value, info="RTD commands", op_code_entry=oce
)
def pack_rtd_commands(op_code: str, object_id: Optional[ObjectId], q: QueueHelper):
def pack_rtd_commands(op_code: str, object_id: Optional[ObjectIdU32], q: QueueHelper):
if object_id is not None and object_id not in RTD_IDS:
print("Specified object ID not a valid RTD ID")
object_id = None

View File

@ -12,7 +12,7 @@ from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from utility.input_helper import InputHelper
@ -150,7 +150,7 @@ class Submode:
FIRMWARE = 2
def pack_star_tracker_commands(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_star_tracker_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Generate command for star tracker with object id: {object_id.as_hex_string}"
)

View File

@ -12,7 +12,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class Commands:
@ -24,7 +24,7 @@ class ImagePathDefs:
uploadFile = "/mnt/sd0/startracker/gemma.bin"
def pack_str_img_helper_command(object_id: ObjectId, q: QueueHelper, op_code: str):
def pack_str_img_helper_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
q.add_log_cmd(
f"Testing star tracker image helper object id: {object_id.as_hex_string}"
)

View File

@ -5,17 +5,13 @@
@author J. Meier
@date 13.12.2020
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
import struct
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class SetIds:
@ -41,9 +37,7 @@ class CommandIds:
DISABLE_DEBUG = 21
def pack_syrlinks_command(
object_id: ObjectId, q: QueueHelper, op_code: str
) -> TcQueueT:
def pack_syrlinks_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str):
obyt = object_id.as_bytes
q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}")
if op_code == "0":

View File

@ -9,7 +9,7 @@ from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
class Tmp1075TestProcedure:
@ -33,7 +33,7 @@ class Tmp1075ActionIds:
start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02])
def pack_tmp1075_test_into(object_id: ObjectId, op_code: str, q: QueueHelper):
def pack_tmp1075_test_into(object_id: ObjectIdU32, op_code: str, q: QueueHelper):
q.add_log_cmd(
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}"
)

View File

@ -81,7 +81,7 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper):
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service_5_test_into(q=queue_helper)
if service == CoreServiceList.SERVICE_17.value:
queue_helper.add_pus_tc(pack_service_17_ping_command())
return queue_helper.add_pus_tc(pack_service_17_ping_command())
if service == CoreServiceList.SERVICE_200.value:
return pack_service200_test_into(q=queue_helper)
if service == CustomServiceList.P60DOCK.value:
@ -201,23 +201,3 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper):
if service == CustomServiceList.RW_ASSEMBLY.value:
return pack_rw_ass_cmds(q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code)
LOGGER.warning(f"Invalid Service {service}")
def pre_tc_send_cb(
queue_entry: Union[bytes, QueueCommands],
com_if: CommunicationInterface,
queue_info: Union[PusTelecommand, any],
file_logger: logging.Logger,
):
if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray):
log_raw_pus_tc(
packet=queue_entry,
srv_subservice=(queue_info.service, queue_info.subservice),
)
tc_info_string = f"Sent {queue_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=queue_entry)
elif isinstance(queue_entry, QueueCommands):
if queue_entry == QueueCommands.PRINT:
file_logger.info(queue_info)

View File

@ -10,9 +10,8 @@ from PyQt5.QtWidgets import (
from PyQt5 import QtCore
from tmtccmd.config import CoreModeList
from tmtccmd.core.globals_manager import get_global
from tmtccmd.config.definitions import CoreGlobalIds, CoreModeList
class Parameter:
@ -93,32 +92,29 @@ class ParameterDialog(QDialog):
"""
def prompt_parameters(parameterList):
gui = get_global(CoreGlobalIds.GUI)
mode = get_global(CoreGlobalIds.MODE)
# gui only works in cont mode right now
if gui and mode == CoreModeList.CONTINUOUS_MODE:
return _gui_prompt(parameterList)
else:
return _cli_prompt(parameterList)
def prompt_parameters_gui(param_list) -> dict:
return _gui_prompt(param_list)
def _gui_prompt(parameterList):
def prompt_parameters_cli(param_list) -> dict:
return _cli_prompt(param_list)
def _gui_prompt(param_list) -> dict:
dialog = ParameterDialog()
for parameter in parameterList:
for parameter in param_list:
dialog.addParameter(parameter["name"], parameter["defaultValue"])
dialog.exec_()
return dialog.getParameters()
def _cli_prompt(parameterList):
def _cli_prompt(param_list) -> dict:
result = {}
for parameter in parameterList:
userInput = input(
for parameter in param_list:
user_input = input(
"Specify {} [{}]: ".format(parameter["name"], parameter["defaultValue"])
)
if userInput == "":
userInput = parameter["defaultValue"]
result[parameter["name"]] = userInput
if user_input == "":
user_input = parameter["defaultValue"]
result[parameter["name"]] = user_input
return result

View File

@ -1,11 +1,11 @@
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from .common import command_mode
import config.object_ids as obj_ids
from pus_tc.prompt_parameters import prompt_parameters
from pus_tc.prompt_parameters import prompt_parameters_cli, prompt_parameters_gui
class OpCodes:
@ -18,13 +18,15 @@ class Info:
CORE_CONTROLLER = "ACS controller"
def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectId):
parameters = prompt_parameters(
[
{"name": "Mode", "defaultValue": "2"},
{"name": "Submode", "defaultValue": "0"},
]
)
def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectIdU32, gui: bool):
param_list = [
{"name": "Mode", "defaultValue": "2"},
{"name": "Submode", "defaultValue": "0"},
]
if gui:
parameters = prompt_parameters_gui(param_list)
else:
parameters = prompt_parameters_cli(param_list)
mode = int(parameters["Mode"])
if mode < 0 or mode > 2:
print("Invalid Mode, defaulting to OFF")
@ -39,7 +41,7 @@ def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectId):
)
def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectId):
def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectIdU32):
command_mode(
object_id=object_id.as_bytes,
mode=Modes.OFF,
@ -49,7 +51,7 @@ def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectId):
)
def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectId):
def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectIdU32):
command_mode(
object_id=object_id.as_bytes,
mode=Modes.ON,
@ -59,7 +61,7 @@ def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectId):
)
def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectId):
def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectIdU32):
command_mode(
object_id=object_id.as_bytes,
mode=Modes.NORMAL,

View File

@ -7,8 +7,10 @@ from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.config.tmtc import OpCodeEntry
from config.object_ids import CORE_CONTROLLER_ID
LOGGER = get_console_logger()
@ -66,60 +68,48 @@ class Copy(enum.IntEnum):
def add_core_controller_definitions(defs: TmTcDefWrapper):
od = dict()
add_op_code_entry(op_code_dict=od, keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC)
add_op_code_entry(op_code_dict=od, keys=OpCodes.REBOOT_FULL, info=Info.REBOOT_FULL)
add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_SELF, info="Reboot Self")
add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_0_0, info="Reboot 0 0")
add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_0_1, info="Reboot 0 1")
add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_1_0, info="Reboot 1 0")
add_op_code_entry(op_code_dict=od, keys=OpCodes.XSC_REBOOT_1_1, info="Reboot 1 1")
add_op_code_entry(
op_code_dict=od,
oce = OpCodeEntry()
oce.add(keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCodes.REBOOT_FULL, info=Info.REBOOT_FULL)
oce.add(keys=OpCodes.XSC_REBOOT_SELF, info="Reboot Self")
oce.add(keys=OpCodes.XSC_REBOOT_0_0, info="Reboot 0 0")
oce.add(keys=OpCodes.XSC_REBOOT_0_1, info="Reboot 0 1")
oce.add(keys=OpCodes.XSC_REBOOT_1_0, info="Reboot 1 0")
oce.add(keys=OpCodes.XSC_REBOOT_1_1, info="Reboot 1 1")
oce.add(
keys=OpCodes.GET_HK,
info="Request housekeeping set",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.ENABLE_REBOOT_FILE_HANDLING,
info="Enable reboot file handling",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.DISABLE_REBOOT_FILE_HANDLING,
info="Disable reboot file handling",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.RESET_ALL_REBOOT_COUNTERS,
info="Reset all reboot counters",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_00,
info="Reset reboot counter 0 0",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_01,
info="Reset reboot counter 0 1",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_10,
info="Reset reboot counter 1 0",
)
add_op_code_entry(
op_code_dict=od,
oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_11,
info="Reset reboot counter 1 1",
)
add_service_op_code_entry(
srv_op_code_dict=defs,
name=CustomServiceList.CORE.value,
info="Core Controller",
op_code_entry=od,
)
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
def pack_core_commands(q: QueueHelper, op_code: str):

View File

@ -7,6 +7,7 @@ from typing import List
from config.definitions import CustomServiceList
from config.object_ids import get_object_ids
from pus_tc.system.tcs import pack_tcs_sys_commands
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_11_tc_sched import (
@ -125,17 +126,14 @@ def generic_print(q: QueueHelper, info: dict):
q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})")
def add_proc_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
def add_proc_cmds(defs: TmTcDefWrapper):
oce = OpCodeEntry()
for proc_entry in PROC_INFO_DICT.values():
add_op_code_entry(
op_code_dict=op_code_dict, keys=proc_entry[0], info=proc_entry[1]
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
oce.add(keys=proc_entry[0], info=proc_entry[1])
defs.add_service(
name=CustomServiceList.PROCEDURE.value,
info="TV Test Procedures",
op_code_entry=op_code_dict,
op_code_entry=oce,
)

View File

@ -6,7 +6,6 @@ from pus_tc.devs.ploc_supervisor import SupvActionIds
from pus_tc.devs.star_tracker import StarTrackerActionIds
from gomspace.gomspace_common import GomspaceDeviceActionIds
from tmtccmd.logging import get_console_logger
from tmtccmd.config.definitions import DataReplyUnpacked
from tmtccmd.tm import Service8FsfwTm
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
@ -91,7 +90,6 @@ def handle_ploc_replies(
def handle_supervisor_replies(
action_id: int, printer: FsfwTmTcPrinter, custom_data: bytearray
):
reply = DataReplyUnpacked()
if action_id == SupvActionIds.DUMP_MRAM:
header_list = ["MRAM Dump"]
content_list = [custom_data[: len(custom_data)]]

View File

@ -1,7 +1,7 @@
import struct
from pus_tm.defs import PrintWrapper
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
from pus_tc.devs.gyros import L3gGyroSetIds, AdisGyroSetIds
@ -9,7 +9,7 @@ import config.object_ids as obj_ids
def handle_gyros_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if object_id.as_bytes in [
obj_ids.GYRO_0_ADIS_HANDLER_ID,
@ -28,7 +28,7 @@ def handle_gyros_hk_data(
def handle_adis_gyro_hk(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == AdisGyroSetIds.CORE_HK:
pw = PrintWrapper(printer)
@ -58,7 +58,7 @@ def handle_adis_gyro_hk(
def handle_l3g_gyro_hk(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == L3gGyroSetIds.CORE_HK:
pw = PrintWrapper(printer)

View File

@ -2,13 +2,13 @@ import struct
from pus_tm.defs import PrintWrapper
from pus_tc.devs.mgms import MgmRm3100SetIds, MgmLis3SetIds
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
import config.object_ids as obj_ids
def handle_mgm_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if object_id.as_bytes in [
obj_ids.MGM_0_LIS3_HANDLER_ID,
@ -24,7 +24,7 @@ def handle_mgm_hk_data(
def handle_mgm_lis3_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == MgmLis3SetIds.CORE_HK:
pw = PrintWrapper(printer)
@ -41,7 +41,7 @@ def handle_mgm_lis3_hk_data(
def handle_mgm_rm3100_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == MgmRm3100SetIds.CORE_HK:
pw = PrintWrapper(printer)

View File

@ -1,11 +1,11 @@
import struct
from pus_tm.defs import PrintWrapper, FsfwTmTcPrinter
from tmtccmd.utility.obj_id import ObjectId
from tmtccmd.utility.obj_id import ObjectIdU32
def handle_rw_hk_data(
printer: FsfwTmTcPrinter, object_id: ObjectId, set_id: int, hk_data: bytes
printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes
):
from pus_tc.devs.reaction_wheels import RwSetIds

View File

@ -2,12 +2,12 @@ import struct
from pus_tm.defs import PrintWrapper
from pus_tc.devs.sus import SetIds
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
def handle_sus_hk(
object_id: ObjectId, hk_data: bytes, printer: FsfwTmTcPrinter, set_id: int
object_id: ObjectIdU32, hk_data: bytes, printer: FsfwTmTcPrinter, set_id: int
):
pw = PrintWrapper(printer)
pw.dlog(f"Received SUS HK data from {object_id}")

View File

@ -9,7 +9,7 @@ from tmtccmd.tm.pus_3_fsfw_hk import (
HkContentType,
Service3FsfwTm,
)
from tmtccmd.utility.obj_id import ObjectId, ObjectIdDictT
from tmtccmd.utility.obj_id import ObjectIdU32, ObjectIdDictT
from tmtccmd.logging import get_console_logger
from pus_tm.devs.bpx_bat import handle_bpx_hk_data
@ -72,14 +72,14 @@ def handle_hk_packet(
def handle_regular_hk_print(
printer: FsfwTmTcPrinter,
object_id: ObjectId,
object_id: ObjectIdU32,
hk_packet: Service3Base,
hk_data: bytes,
):
objb = object_id.as_bytes
set_id = hk_packet.set_id
"""This function is called when a Service 3 Housekeeping packet is received."""
if object_id in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
handle_rw_hk_data(printer, object_id, set_id, hk_data)
elif objb == obj_ids.SYRLINKS_HANDLER_ID:
handle_syrlinks_hk_data(printer=printer, hk_data=hk_data, set_id=set_id)

View File

@ -3,7 +3,7 @@ import struct
from pus_tm.defs import PrintWrapper
from pus_tm.tcp_server_objects import *
from tmtccmd.utility import ObjectId
from tmtccmd.utility import ObjectIdU32
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
@ -14,7 +14,7 @@ class SetIds(enum.IntEnum):
def handle_thermal_controller_hk_data(
object_id: ObjectId, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
):
if set_id == SetIds.SENSOR_TEMPERATURE_SET:
pw = PrintWrapper(printer)
@ -42,8 +42,7 @@ def handle_thermal_controller_hk_data(
"TMP1075 1": tm_data[16],
"TMP1075 2": tm_data[17],
}
# print(parsed_data)
printer.file_logger.info(str(parsed_data))
tcp_server_sensor_temperatures.report_parsed_hk_data(
object_id, set_id, parsed_data
)

View File

@ -4,7 +4,7 @@ import json
import base64
from tmtccmd.logging import get_console_logger
from tmtccmd.utility.obj_id import ObjectId
from tmtccmd.utility.obj_id import ObjectIdU32
from dle_encoder import DleEncoder
LOGGER = get_console_logger()
@ -64,7 +64,7 @@ class TmTcpServer:
self.client_connection.close()
self.client_connection = None
def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes):
def report_raw_hk_data(self, object_id: ObjectIdU32, set_id: int, hk_data: bytes):
data_dict = {
"type": "TM",
@ -76,7 +76,9 @@ class TmTcpServer:
self._send_dictionary_over_socket(data_dict)
def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary):
def report_parsed_hk_data(
self, object_id: ObjectIdU32, set_id: int, data_dictionary
):
data_dict = {
"type": "TM",
"tmType": "Parsed HK",

View File

@ -53,7 +53,7 @@ from tmtccmd.config.args import (
)
from config import __version__
from config.definitions import PUS_APID
from config.hook_implementations import EiveHookObject
from config.hook import EiveHookObject
from pus_tm.factory_hook import pus_factory_hook
from pus_tc.procedure_packer import handle_default_procedure