some more consistency updates, delete obsolete file

This commit is contained in:
Robin Müller 2023-01-16 14:18:15 +01:00
parent e7609f02b9
commit 9d835e5705
No known key found for this signature in database
GPG Key ID: 11D4952C8CCEF814
8 changed files with 102 additions and 134 deletions

View File

@ -1,33 +0,0 @@
"""
@brief This file transfers definitions of global variables to the user.
@details Template configuration file. Copy this folder to the TMTC commander root and adapt
it to your needs.
"""
import enum
import argparse
# All globals can be added here and will be part of a globals dictionary.
from config.definitions import CustomServiceList, PUS_APID
from config.custom_mode_op import CustomModeList
from tmtccmd.config.definitions import CoreComInterfaces
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
from tmtccmd.logging import get_console_logger
LOGGER = get_console_logger()
class CustomGlobalIds(enum.Enum):
from enum import auto
pass
def set_globals_pre_args_parsing(gui: bool = False):
set_default_globals_pre_args_parsing(
gui=gui,
tc_apid=PUS_APID,
tm_apid=PUS_APID,
com_if_id=CoreComInterfaces.TCPIP_UDP.value,
)

View File

@ -16,7 +16,7 @@ from tmtccmd.tc.pus_8_funccmd import make_fsfw_action_cmd
from tmtccmd.util import ObjectIdU32, ObjectIdBase from tmtccmd.util import ObjectIdU32, ObjectIdBase
class GomspaceDeviceActionIds(enum.IntEnum): class GomspaceDeviceActionId(enum.IntEnum):
PING = 1 PING = 1
REBOOT = 4 REBOOT = 4
PARAM_GET = 0 PARAM_GET = 0
@ -64,7 +64,7 @@ class Channel:
def pack_request_config_command(object_id: bytes) -> PusTelecommand: def pack_request_config_command(object_id: bytes) -> PusTelecommand:
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id, action_id=GomspaceDeviceActionIds.REQUEST_CONFIG_TABLE object_id=object_id, action_id=GomspaceDeviceActionId.REQUEST_CONFIG_TABLE
) )
@ -90,7 +90,7 @@ def pack_get_param_command(
app_data += struct.pack("!B", parameter_size) app_data += struct.pack("!B", parameter_size)
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id, object_id=object_id,
action_id=GomspaceDeviceActionIds.PARAM_GET, action_id=GomspaceDeviceActionId.PARAM_GET,
user_data=app_data, user_data=app_data,
) )
@ -98,7 +98,7 @@ def pack_get_param_command(
def pack_set_float_param_command( def pack_set_float_param_command(
object_id: bytes, memory_address: bytes, parameter: float object_id: bytes, memory_address: bytes, parameter: float
) -> PusTelecommand: ) -> PusTelecommand:
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(4) app_data.append(4)
@ -111,7 +111,7 @@ def pack_set_float_param_command(
def pack_set_u8_param_command( def pack_set_u8_param_command(
object_id: bytes, memory_address: bytes, parameter: int object_id: bytes, memory_address: bytes, parameter: int
) -> PusTelecommand: ) -> PusTelecommand:
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(1) app_data.append(1)
@ -124,7 +124,7 @@ def pack_set_u8_param_command(
def pack_set_i8_param_command( def pack_set_i8_param_command(
object_id: bytes, memory_address: bytes, parameter: int object_id: bytes, memory_address: bytes, parameter: int
) -> PusTelecommand: ) -> PusTelecommand:
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(1) app_data.append(1)
@ -137,7 +137,7 @@ def pack_set_i8_param_command(
def pack_set_u16_param_command( def pack_set_u16_param_command(
object_id: bytes, memory_address: bytes, parameter: int object_id: bytes, memory_address: bytes, parameter: int
) -> PusTelecommand: ) -> PusTelecommand:
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(2) app_data.append(2)
@ -150,7 +150,7 @@ def pack_set_u16_param_command(
def pack_set_i16_param_command( def pack_set_i16_param_command(
object_id: bytes, memory_address: bytes, parameter: int object_id: bytes, memory_address: bytes, parameter: int
) -> PusTelecommand: ) -> PusTelecommand:
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(2) app_data.append(2)
@ -161,7 +161,7 @@ def pack_set_i16_param_command(
def pack_set_u32_param_command(object_id: bytes, memory_address: bytes, parameter: int): def pack_set_u32_param_command(object_id: bytes, memory_address: bytes, parameter: int):
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(4) app_data.append(4)
@ -172,7 +172,7 @@ def pack_set_u32_param_command(object_id: bytes, memory_address: bytes, paramete
def pack_set_i32_param_command(object_id: bytes, memory_address: bytes, parameter: int): def pack_set_i32_param_command(object_id: bytes, memory_address: bytes, parameter: int):
action_id = GomspaceDeviceActionIds.PARAM_SET action_id = GomspaceDeviceActionId.PARAM_SET
app_data = bytearray() app_data = bytearray()
app_data += memory_address app_data += memory_address
app_data.append(4) app_data.append(4)
@ -226,7 +226,7 @@ def pack_ping_command(object_id: ObjectIdU32, data: bytearray) -> PusTelecommand
""" """
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
action_id=GomspaceDeviceActionIds.PING, action_id=GomspaceDeviceActionId.PING,
user_data=data, user_data=data,
) )
@ -236,7 +236,7 @@ def pack_gnd_wdt_reset_command(object_id: ObjectIdBase) -> PusTelecommand:
@param object_id Object Id of the gomspace device handler. @param object_id Object Id of the gomspace device handler.
""" """
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.WDT_RESET object_id=object_id.as_bytes, action_id=GomspaceDeviceActionId.WDT_RESET
) )
@ -245,7 +245,7 @@ def pack_reboot_command(object_id: ObjectIdU32) -> PusTelecommand:
@param object_id The object id of the gomspace device handler. @param object_id The object id of the gomspace device handler.
""" """
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.REBOOT object_id=object_id.as_bytes, action_id=GomspaceDeviceActionId.REBOOT
) )
@ -255,5 +255,5 @@ def pack_request_full_hk_table_command(object_id: ObjectIdU32) -> PusTelecommand
@param object_id The object id of the gomspace device handler. @param object_id The object id of the gomspace device handler.
""" """
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=object_id.as_bytes, action_id=GomspaceDeviceActionIds.REQUEST_HK_TABLE object_id=object_id.as_bytes, action_id=GomspaceDeviceActionId.REQUEST_HK_TABLE
) )

View File

@ -5,6 +5,7 @@
@author J. Meier @author J. Meier
@date 20.11.2021 @date 20.11.2021
""" """
import enum
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
@ -12,7 +13,7 @@ from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
class CommandIds: class CommandId(enum.IntEnum):
# Configures input rate of syrlinks to 400 Khz (results in downlink rate of 200 kbps) # Configures input rate of syrlinks to 400 Khz (results in downlink rate of 200 kbps)
SET_LOW_RATE = 0 SET_LOW_RATE = 0
# Configures input rate of syrlinks to 2000 Khz (results in downlink rate of 1000 kbps) # Configures input rate of syrlinks to 2000 Khz (results in downlink rate of 1000 kbps)
@ -39,42 +40,42 @@ def pack_ccsds_handler_test(
q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}")
if op_code == "0": if op_code == "0":
q.add_log_cmd("CCSDS Handler: Set low rate") q.add_log_cmd("CCSDS Handler: Set low rate")
command = obyt + struct.pack("!I", CommandIds.SET_LOW_RATE) command = obyt + struct.pack("!I", CommandId.SET_LOW_RATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "1": if op_code == "1":
q.add_log_cmd("CCSDS Handler: Set high rate") q.add_log_cmd("CCSDS Handler: Set high rate")
command = obyt + struct.pack("!I", CommandIds.SET_HIGH_RATE) command = obyt + struct.pack("!I", CommandId.SET_HIGH_RATE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "2": if op_code == "2":
q.add_log_cmd("CCSDS Handler: Enables the transmitter") q.add_log_cmd("CCSDS Handler: Enables the transmitter")
command = obyt + struct.pack("!I", CommandIds.EN_TRANSMITTER) command = obyt + struct.pack("!I", CommandId.EN_TRANSMITTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "3": if op_code == "3":
q.add_log_cmd("CCSDS Handler: Disables the transmitter") q.add_log_cmd("CCSDS Handler: Disables the transmitter")
command = obyt + struct.pack("!I", CommandIds.DIS_TRANSMITTER) command = obyt + struct.pack("!I", CommandId.DIS_TRANSMITTER)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "4": if op_code == "4":
q.add_log_cmd("CCSDS Handler: Set arbitrary bitrate") q.add_log_cmd("CCSDS Handler: Set arbitrary bitrate")
bitrate = int(input("Specify bit rate (bps): ")) bitrate = int(input("Specify bit rate (bps): "))
command = ( command = (
obyt obyt
+ struct.pack("!I", CommandIds.ARBITRARY_BITRATE) + struct.pack("!I", CommandId.ARBITRARY_BITRATE)
+ struct.pack("!I", bitrate) + struct.pack("!I", bitrate)
) )
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "5": if op_code == "5":
q.add_log_cmd("CCSDS Handler: Enable tx clock manipulator") q.add_log_cmd("CCSDS Handler: Enable tx clock manipulator")
command = obyt + struct.pack("!I", CommandIds.ENABLE_TX_CLK_MANIPULATOR) command = obyt + struct.pack("!I", CommandId.ENABLE_TX_CLK_MANIPULATOR)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "6": if op_code == "6":
q.add_log_cmd("CCSDS Handler: Disable tx clock manipulator") q.add_log_cmd("CCSDS Handler: Disable tx clock manipulator")
command = obyt + struct.pack("!I", CommandIds.DISABLE_TX_CLK_MANIPULATOR) command = obyt + struct.pack("!I", CommandId.DISABLE_TX_CLK_MANIPULATOR)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "7": if op_code == "7":
q.add_log_cmd("CCSDS Handler: Update tx data on rising edge of tx clock") q.add_log_cmd("CCSDS Handler: Update tx data on rising edge of tx clock")
command = obyt + struct.pack("!I", CommandIds.UPDATE_ON_RISING_EDGE) command = obyt + struct.pack("!I", CommandId.UPDATE_ON_RISING_EDGE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
if op_code == "8": if op_code == "8":
q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock") q.add_log_cmd("CCSDS Handler: Update tx data on falling edge of tx clock")
command = obyt + struct.pack("!I", CommandIds.UPDATE_ON_FALLING_EDGE) command = obyt + struct.pack("!I", CommandId.UPDATE_ON_FALLING_EDGE)
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))

View File

@ -21,7 +21,7 @@ from tmtccmd.tc.pus_3_fsfw_hk import *
import eive_tmtc.config.object_ids as oids import eive_tmtc.config.object_ids as oids
from eive_tmtc.pus_tc.system.tcs import OpCode as TcsOpCodes from eive_tmtc.pus_tc.system.tcs import OpCode as TcsOpCodes
from eive_tmtc.pus_tc.devs.bpx_batt import BpxSetId from eive_tmtc.pus_tc.devs.bpx_batt import BpxSetId
from eive_tmtc.tmtc.core import SetIds as CoreSetIds from eive_tmtc.tmtc.core import SetId as CoreSetIds
from eive_tmtc.tmtc.power.common_power import SetId as GsSetIds from eive_tmtc.tmtc.power.common_power import SetId as GsSetIds
from eive_tmtc.pus_tc.devs.rad_sensor import SetId as RadSetIds from eive_tmtc.pus_tc.devs.rad_sensor import SetId as RadSetIds
from eive_tmtc.pus_tc.devs.mgms import MgmLis3SetId as MgmLis3SetIds_0_2 from eive_tmtc.pus_tc.devs.mgms import MgmLis3SetId as MgmLis3SetIds_0_2

View File

@ -18,7 +18,7 @@ from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger() LOGGER = get_console_logger()
class ActionIds(enum.IntEnum): class ActionId(enum.IntEnum):
LIST_DIR_INTO_FILE = 0 LIST_DIR_INTO_FILE = 0
SWITCH_REBOOT_FILE_HANDLING = 5 SWITCH_REBOOT_FILE_HANDLING = 5
RESET_REBOOT_COUNTER = 6 RESET_REBOOT_COUNTER = 6
@ -34,11 +34,11 @@ class ActionIds(enum.IntEnum):
FULL_REBOOT = 34 FULL_REBOOT = 34
class SetIds(enum.IntEnum): class SetId(enum.IntEnum):
HK = 5 HK = 5
class OpCodes: class OpCode:
REBOOT_XSC = ["0", "reboot_xsc"] REBOOT_XSC = ["0", "reboot_xsc"]
XSC_REBOOT_SELF = ["1", "reboot_self"] XSC_REBOOT_SELF = ["1", "reboot_self"]
XSC_REBOOT_0_0 = ["2", "reboot_00"] XSC_REBOOT_0_0 = ["2", "reboot_00"]
@ -89,60 +89,60 @@ class Copy(enum.IntEnum):
@tmtc_definitions_provider @tmtc_definitions_provider
def add_core_controller_definitions(defs: TmtcDefinitionWrapper): def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
oce.add(keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC) oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCodes.REBOOT_XSC, info=Info.REBOOT_XSC) oce.add(keys=OpCode.REBOOT_XSC, info=Info.REBOOT_XSC)
oce.add(keys=OpCodes.REBOOT_FULL, info=Info.REBOOT_FULL) oce.add(keys=OpCode.REBOOT_FULL, info=Info.REBOOT_FULL)
oce.add(keys=OpCodes.XSC_REBOOT_SELF, info="Reboot Self") oce.add(keys=OpCode.XSC_REBOOT_SELF, info="Reboot Self")
oce.add(keys=OpCodes.XSC_REBOOT_0_0, info="Reboot 0 0") oce.add(keys=OpCode.XSC_REBOOT_0_0, info="Reboot 0 0")
oce.add(keys=OpCodes.XSC_REBOOT_0_1, info="Reboot 0 1") oce.add(keys=OpCode.XSC_REBOOT_0_1, info="Reboot 0 1")
oce.add(keys=OpCodes.XSC_REBOOT_1_0, info="Reboot 1 0") oce.add(keys=OpCode.XSC_REBOOT_1_0, info="Reboot 1 0")
oce.add(keys=OpCodes.XSC_REBOOT_1_1, info="Reboot 1 1") oce.add(keys=OpCode.XSC_REBOOT_1_1, info="Reboot 1 1")
oce.add(keys=OpCodes.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCodes.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCodes.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add( oce.add(
keys=OpCodes.GET_HK, keys=OpCode.GET_HK,
info="Request housekeeping set", info="Request housekeeping set",
) )
oce.add( oce.add(
keys=OpCodes.ENABLE_REBOOT_FILE_HANDLING, keys=OpCode.ENABLE_REBOOT_FILE_HANDLING,
info="Enable reboot file handling", info="Enable reboot file handling",
) )
oce.add( oce.add(
keys=OpCodes.DISABLE_REBOOT_FILE_HANDLING, keys=OpCode.DISABLE_REBOOT_FILE_HANDLING,
info="Disable reboot file handling", info="Disable reboot file handling",
) )
oce.add( oce.add(
keys=OpCodes.RESET_ALL_REBOOT_COUNTERS, keys=OpCode.RESET_ALL_REBOOT_COUNTERS,
info="Reset all reboot counters", info="Reset all reboot counters",
) )
oce.add( oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_00, keys=OpCode.RESET_REBOOT_COUNTER_00,
info="Reset reboot counter 0 0", info="Reset reboot counter 0 0",
) )
oce.add( oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_01, keys=OpCode.RESET_REBOOT_COUNTER_01,
info="Reset reboot counter 0 1", info="Reset reboot counter 0 1",
) )
oce.add( oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_10, keys=OpCode.RESET_REBOOT_COUNTER_10,
info="Reset reboot counter 1 0", info="Reset reboot counter 1 0",
) )
oce.add( oce.add(
keys=OpCodes.RESET_REBOOT_COUNTER_11, keys=OpCode.RESET_REBOOT_COUNTER_11,
info="Reset reboot counter 1 1", info="Reset reboot counter 1 1",
) )
oce.add(keys=OpCodes.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_0, info=Info.OBSW_UPDATE_FROM_SD_0)
oce.add(keys=OpCodes.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1) oce.add(keys=OpCode.OBSW_UPDATE_FROM_SD_1, info=Info.OBSW_UPDATE_FROM_SD_1)
oce.add(keys=OpCodes.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP) oce.add(keys=OpCode.OBSW_UPDATE_FROM_TMP, info=Info.OBSW_UPDATE_FROM_TMP)
oce.add(keys=OpCodes.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0) oce.add(keys=OpCode.SWITCH_TO_SD_0, info=Info.SWITCH_TO_SD_0)
oce.add(keys=OpCodes.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1) oce.add(keys=OpCode.SWITCH_TO_SD_1, info=Info.SWITCH_TO_SD_1)
oce.add(keys=OpCodes.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS) oce.add(keys=OpCode.SWITCH_TO_BOTH_SD_CARDS, info=Info.SWITCH_TO_BOTH_SD_CARDS)
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
def pack_core_commands(q: DefaultPusQueueHelper, op_code: str): def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.REBOOT_XSC: if op_code in OpCode.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params() reboot_self, chip_select, copy_select = determine_reboot_params()
add_xsc_reboot_cmd( add_xsc_reboot_cmd(
q=q, q=q,
@ -150,93 +150,93 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
chip=chip_select, chip=chip_select,
copy=copy_select, copy=copy_select,
) )
if op_code in OpCodes.REBOOT_FULL: if op_code in OpCode.REBOOT_FULL:
q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}") q.add_log_cmd(f"Core Command: {Info.REBOOT_FULL}")
q.add_pus_tc(create_full_reboot_cmds()) q.add_pus_tc(create_full_reboot_cmds())
if op_code in OpCodes.XSC_REBOOT_SELF: if op_code in OpCode.XSC_REBOOT_SELF:
add_xsc_reboot_cmd(q=q, reboot_self=True) add_xsc_reboot_cmd(q=q, reboot_self=True)
if op_code in OpCodes.XSC_REBOOT_0_0: if op_code in OpCode.XSC_REBOOT_0_0:
add_xsc_reboot_cmd( add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM q=q, reboot_self=False, chip=Chip.CHIP_0, copy=Copy.COPY_0_NOM
) )
if op_code in OpCodes.XSC_REBOOT_0_1: if op_code in OpCode.XSC_REBOOT_0_1:
add_xsc_reboot_cmd( add_xsc_reboot_cmd(
q=q, q=q,
reboot_self=False, reboot_self=False,
chip=Chip.CHIP_0, chip=Chip.CHIP_0,
copy=Copy.COPY_1_GOLD, copy=Copy.COPY_1_GOLD,
) )
if op_code in OpCodes.XSC_REBOOT_1_0: if op_code in OpCode.XSC_REBOOT_1_0:
add_xsc_reboot_cmd( add_xsc_reboot_cmd(
q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM q=q, reboot_self=False, chip=Chip.CHIP_1, copy=Copy.COPY_0_NOM
) )
if op_code in OpCodes.XSC_REBOOT_1_1: if op_code in OpCode.XSC_REBOOT_1_1:
add_xsc_reboot_cmd( add_xsc_reboot_cmd(
q=q, q=q,
reboot_self=False, reboot_self=False,
chip=Chip.CHIP_1, chip=Chip.CHIP_1,
copy=Copy.COPY_1_GOLD, copy=Copy.COPY_1_GOLD,
) )
if op_code in OpCodes.DISABLE_REBOOT_FILE_HANDLING: if op_code in OpCode.DISABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Disabling reboot file handling") q.add_log_cmd("Disabling reboot file handling")
user_data = bytearray([0]) user_data = bytearray([0])
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.SWITCH_REBOOT_FILE_HANDLING, action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data, user_data=user_data,
) )
) )
if op_code in OpCodes.ENABLE_REBOOT_FILE_HANDLING: if op_code in OpCode.ENABLE_REBOOT_FILE_HANDLING:
q.add_log_cmd("Enabling reboot file handling") q.add_log_cmd("Enabling reboot file handling")
user_data = bytearray([1]) user_data = bytearray([1])
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.SWITCH_REBOOT_FILE_HANDLING, action_id=ActionId.SWITCH_REBOOT_FILE_HANDLING,
user_data=user_data, user_data=user_data,
) )
) )
if op_code in OpCodes.RESET_ALL_REBOOT_COUNTERS: if op_code in OpCode.RESET_ALL_REBOOT_COUNTERS:
q.add_log_cmd("Resetting all reboot counters") q.add_log_cmd("Resetting all reboot counters")
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.RESET_REBOOT_COUNTER, action_id=ActionId.RESET_REBOOT_COUNTER,
) )
) )
if op_code in OpCodes.RESET_REBOOT_COUNTER_00: if op_code in OpCode.RESET_REBOOT_COUNTER_00:
reset_specific_boot_counter(q, 0, 0) reset_specific_boot_counter(q, 0, 0)
if op_code in OpCodes.RESET_REBOOT_COUNTER_01: if op_code in OpCode.RESET_REBOOT_COUNTER_01:
reset_specific_boot_counter(q, 0, 1) reset_specific_boot_counter(q, 0, 1)
if op_code in OpCodes.RESET_REBOOT_COUNTER_10: if op_code in OpCode.RESET_REBOOT_COUNTER_10:
reset_specific_boot_counter(q, 1, 0) reset_specific_boot_counter(q, 1, 0)
if op_code in OpCodes.RESET_REBOOT_COUNTER_11: if op_code in OpCode.RESET_REBOOT_COUNTER_11:
reset_specific_boot_counter(q, 1, 1) reset_specific_boot_counter(q, 1, 1)
if op_code in OpCodes.OBSW_UPDATE_FROM_SD_0: if op_code in OpCode.OBSW_UPDATE_FROM_SD_0:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0) q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_0)
q.add_pus_tc(pack_obsw_update_cmd(ActionIds.UPDATE_OBSW_FROM_SD_0)) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_0))
if op_code in OpCodes.OBSW_UPDATE_FROM_SD_1: if op_code in OpCode.OBSW_UPDATE_FROM_SD_1:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1) q.add_log_cmd(Info.OBSW_UPDATE_FROM_SD_1)
q.add_pus_tc(pack_obsw_update_cmd(ActionIds.UPDATE_OBSW_FROM_SD_1)) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_SD_1))
if op_code in OpCodes.OBSW_UPDATE_FROM_TMP: if op_code in OpCode.OBSW_UPDATE_FROM_TMP:
q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP) q.add_log_cmd(Info.OBSW_UPDATE_FROM_TMP)
q.add_pus_tc(pack_obsw_update_cmd(ActionIds.UPDATE_OBSW_FROM_TMP)) q.add_pus_tc(pack_obsw_update_cmd(ActionId.UPDATE_OBSW_FROM_TMP))
if op_code in OpCodes.SWITCH_TO_SD_0: if op_code in OpCode.SWITCH_TO_SD_0:
q.add_log_cmd(Info.SWITCH_TO_SD_0) q.add_log_cmd(Info.SWITCH_TO_SD_0)
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.SWITCH_TO_SD_0 object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_0
) )
) )
if op_code in OpCodes.SWITCH_TO_SD_1: if op_code in OpCode.SWITCH_TO_SD_1:
q.add_log_cmd(Info.SWITCH_TO_SD_1) q.add_log_cmd(Info.SWITCH_TO_SD_1)
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.SWITCH_TO_SD_1 object_id=CORE_CONTROLLER_ID, action_id=ActionId.SWITCH_TO_SD_1
) )
) )
if op_code in OpCodes.SWITCH_TO_BOTH_SD_CARDS: if op_code in OpCode.SWITCH_TO_BOTH_SD_CARDS:
while True: while True:
active_sd_card = int(input("Please specify active SD cqrd [0/1]: ")) active_sd_card = int(input("Please specify active SD cqrd [0/1]: "))
if active_sd_card not in [0, 1]: if active_sd_card not in [0, 1]:
@ -246,13 +246,13 @@ def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.SWITCH_TO_BOTH_SD_CARDS, action_id=ActionId.SWITCH_TO_BOTH_SD_CARDS,
user_data=bytes([active_sd_card]), user_data=bytes([active_sd_card]),
) )
) )
if op_code in OpCodes.GET_HK: if op_code in OpCode.GET_HK:
q.add_log_cmd("Requesting housekeeping set") q.add_log_cmd("Requesting housekeeping set")
sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetIds.HK) sid = make_sid(object_id=CORE_CONTROLLER_ID, set_id=SetId.HK)
q.add_pus_tc(generate_one_hk_command(sid)) q.add_pus_tc(generate_one_hk_command(sid))
@ -261,7 +261,7 @@ def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
action_id=ActionIds.RESET_REBOOT_COUNTER, action_id=ActionId.RESET_REBOOT_COUNTER,
user_data=bytes([chip, copy]), user_data=bytes([chip, copy]),
) )
) )
@ -269,7 +269,7 @@ def reset_specific_boot_counter(q: DefaultPusQueueHelper, chip: int, copy: int):
def create_full_reboot_cmds() -> PusTelecommand: def create_full_reboot_cmds() -> PusTelecommand:
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.FULL_REBOOT object_id=CORE_CONTROLLER_ID, action_id=ActionId.FULL_REBOOT
) )
@ -342,12 +342,12 @@ def create_xsc_reboot_cmds(
tc_data.append(chip) tc_data.append(chip)
tc_data.append(copy) tc_data.append(copy)
return make_fsfw_action_cmd( return make_fsfw_action_cmd(
object_id=CORE_CONTROLLER_ID, action_id=ActionIds.XSC_REBOOT, user_data=tc_data object_id=CORE_CONTROLLER_ID, action_id=ActionId.XSC_REBOOT, user_data=tc_data
) )
def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes): def handle_core_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
if set_id == SetIds.HK: if set_id == SetId.HK:
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
fmt_str = "!fff" fmt_str = "!fff"
inc_len = struct.calcsize(fmt_str) inc_len = struct.calcsize(fmt_str)

View File

@ -4,7 +4,7 @@ from typing import List
from eive_tmtc.gomspace.gomspace_common import ( from eive_tmtc.gomspace.gomspace_common import (
pack_set_u8_param_command, pack_set_u8_param_command,
Channel, Channel,
GomspaceDeviceActionIds, GomspaceDeviceActionId,
prompt_and_pack_set_integer_param_command, prompt_and_pack_set_integer_param_command,
prompt_and_pack_get_param_command, prompt_and_pack_get_param_command,
pack_request_config_command, pack_request_config_command,
@ -144,7 +144,7 @@ def pack_common_gomspace_cmds(
q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_SWITCH_V_I}") q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_SWITCH_V_I}")
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_SWITCH_V_I object_id=objb, action_id=GomspaceDeviceActionId.PRINT_SWITCH_V_I
) )
) )
if op_code in PowerOpCodes.REBOOT: if op_code in PowerOpCodes.REBOOT:
@ -154,7 +154,7 @@ def pack_common_gomspace_cmds(
q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_LATCHUPS}") q.add_log_cmd(f"{prefix}: {PowerInfo.PRINT_LATCHUPS}")
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=objb, action_id=GomspaceDeviceActionIds.PRINT_LATCHUPS object_id=objb, action_id=GomspaceDeviceActionId.PRINT_LATCHUPS
) )
) )
if op_code in GomspaceOpCode.SET_INTEGER_PARAM: if op_code in GomspaceOpCode.SET_INTEGER_PARAM:
@ -189,7 +189,7 @@ def pack_common_gomspace_cmds(
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
action_id=GomspaceDeviceActionIds.SAVE_TABLE, action_id=GomspaceDeviceActionId.SAVE_TABLE,
user_data=bytes([source_table]), user_data=bytes([source_table]),
) )
) )
@ -204,7 +204,7 @@ def pack_common_gomspace_cmds(
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
action_id=GomspaceDeviceActionIds.SAVE_TABLE_DEFAULT, action_id=GomspaceDeviceActionId.SAVE_TABLE_DEFAULT,
user_data=bytes([source_table]), user_data=bytes([source_table]),
) )
) )
@ -232,7 +232,7 @@ def pack_common_gomspace_cmds(
q.add_pus_tc( q.add_pus_tc(
make_fsfw_action_cmd( make_fsfw_action_cmd(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
action_id=GomspaceDeviceActionIds.LOAD_TABLE, action_id=GomspaceDeviceActionId.LOAD_TABLE,
user_data=bytes([source_table, target_table]), user_data=bytes([source_table, target_table]),
) )
) )

View File

@ -10,7 +10,7 @@ from eive_tmtc.tmtc.power.common_power import (
from tmtccmd.util import ObjectIdBase from tmtccmd.util import ObjectIdBase
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionIds from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
PDU_1_HANDLER_ID, PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID, PDU_2_HANDLER_ID,
@ -439,7 +439,7 @@ def handle_acu_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
def handle_get_param_data_reply( def handle_get_param_data_reply(
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray
): ):
if action_id == GomspaceDeviceActionIds.PARAM_GET: if action_id == GomspaceDeviceActionId.PARAM_GET:
pw.dlog(f"Parameter Get Request received for object {obj_id}") pw.dlog(f"Parameter Get Request received for object {obj_id}")
header_list = [ header_list = [
"Gomspace Request Code", "Gomspace Request Code",
@ -461,7 +461,7 @@ def handle_get_param_data_reply(
] ]
pw.dlog(f"{header_list}") pw.dlog(f"{header_list}")
pw.dlog(f"{content_list}") pw.dlog(f"{content_list}")
elif action_id == GomspaceDeviceActionIds.REQUEST_CONFIG_TABLE: elif action_id == GomspaceDeviceActionId.REQUEST_CONFIG_TABLE:
print(f"Received config table with size {len(custom_data)} for object {obj_id}") print(f"Received config table with size {len(custom_data)} for object {obj_id}")
if obj_id.as_bytes == PDU_1_HANDLER_ID or obj_id.as_bytes == PDU_2_HANDLER_ID: if obj_id.as_bytes == PDU_1_HANDLER_ID or obj_id.as_bytes == PDU_2_HANDLER_ID:
pdu_config_table_handler(pw, custom_data, obj_id) pdu_config_table_handler(pw, custom_data, obj_id)

View File

@ -22,7 +22,7 @@ from tmtccmd import get_console_logger
LOGGER = get_console_logger() LOGGER = get_console_logger()
class OpCodes: class OpCode:
MANUAL_DEPLOYMENT = "man_depl" MANUAL_DEPLOYMENT = "man_depl"
@ -30,14 +30,14 @@ class Info:
MANUAL_DEPLOYMENT = "Manual Solar Array Deployment" MANUAL_DEPLOYMENT = "Manual Solar Array Deployment"
class ActionIds: class ActionId:
MANUAL_DEPLOYMENT = 5 MANUAL_DEPLOYMENT = 5
@tmtc_definitions_provider @tmtc_definitions_provider
def pack_sa_depl_cmds(defs: TmtcDefinitionWrapper): def pack_sa_depl_cmds(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry() oce = OpCodeEntry()
oce.add(keys=OpCodes.MANUAL_DEPLOYMENT, info=Info.MANUAL_DEPLOYMENT) oce.add(keys=OpCode.MANUAL_DEPLOYMENT, info=Info.MANUAL_DEPLOYMENT)
defs.add_service( defs.add_service(
name=CustomServiceList.SA_DEPLYOMENT, name=CustomServiceList.SA_DEPLYOMENT,
info="Solar Array Deployment", info="Solar Array Deployment",
@ -73,6 +73,6 @@ def pack_solar_array_deployment_test_into(p: ServiceProviderParams):
dry_run_str = "" dry_run_str = ""
q.add_log_cmd(f"Testing S/A Deployment with burn time {burn_time}{dry_run_str}") q.add_log_cmd(f"Testing S/A Deployment with burn time {burn_time}{dry_run_str}")
command = make_fsfw_action_cmd( command = make_fsfw_action_cmd(
SOLAR_ARRAY_DEPLOYMENT_ID, ActionIds.MANUAL_DEPLOYMENT, user_data SOLAR_ARRAY_DEPLOYMENT_ID, ActionId.MANUAL_DEPLOYMENT, user_data
) )
q.add_pus_tc(command) q.add_pus_tc(command)