scex code additions #115

Merged
muellerr merged 16 commits from scex-additions into main 2022-09-29 11:48:27 +02:00
42 changed files with 186 additions and 123 deletions
Showing only changes of commit 219116a773 - Show all commits

View File

@ -15,7 +15,7 @@
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s 17 -o 0 -d 3" /> <option name="PARAMETERS" value="-s 17 -o 0 -d 3" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" /> <option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />

View File

@ -3,9 +3,6 @@
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" /> <option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" /> <option name="IS_MODULE_SDK" value="true" />
@ -15,7 +12,7 @@
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" /> <option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />

View File

@ -208,4 +208,3 @@ Event ID (dec); Event ID (hex); Name; Severity; Description; File Path
13701;0x3585;REBOOT_SW;MEDIUM; Software reboot occurred. Can also be a systemd reboot. P1: Current Chip, P2: Current Copy;bsp_q7s/core/CoreController.h 13701;0x3585;REBOOT_SW;MEDIUM; Software reboot occurred. Can also be a systemd reboot. P1: Current Chip, P2: Current Copy;bsp_q7s/core/CoreController.h
13702;0x3586;REBOOT_MECHANISM_TRIGGERED;MEDIUM;The reboot mechanism was triggered. P1: First 16 bits: Last Chip, Last 16 bits: Last Copy, P2: Each byte is the respective reboot count for the slots;bsp_q7s/core/CoreController.h 13702;0x3586;REBOOT_MECHANISM_TRIGGERED;MEDIUM;The reboot mechanism was triggered. P1: First 16 bits: Last Chip, Last 16 bits: Last Copy, P2: Each byte is the respective reboot count for the slots;bsp_q7s/core/CoreController.h
13703;0x3587;REBOOT_HW;MEDIUM;;bsp_q7s/core/CoreController.h 13703;0x3587;REBOOT_HW;MEDIUM;;bsp_q7s/core/CoreController.h
13800;0x35e8;EXPERIMENT_TIMEDOUT;LOW;;mission/devices/devicedefinitions/ScexDefinitions.h

1 Event ID (dec) Event ID (hex) Name Severity Description File Path
208 13625 13701 0x3539 0x3585 ACK_RECEPTION_FAILURE REBOOT_SW LOW MEDIUM Failed to receive acknowledgment report P1: Return value P2: Apid of command for which the reception of the acknowledgment report failed Software reboot occurred. Can also be a systemd reboot. P1: Current Chip, P2: Current Copy linux/devices/ploc/PlocSupvHelper.h bsp_q7s/core/CoreController.h
209 13626 13702 0x353a 0x3586 EXE_RECEPTION_FAILURE REBOOT_MECHANISM_TRIGGERED LOW MEDIUM Failed to receive execution report P1: Return value P2: Apid of command for which the reception of the execution report failed The reboot mechanism was triggered. P1: First 16 bits: Last Chip, Last 16 bits: Last Copy, P2: Each byte is the respective reboot count for the slots linux/devices/ploc/PlocSupvHelper.h bsp_q7s/core/CoreController.h
210 13627 13703 0x353b 0x3587 WRITE_MEMORY_FAILED REBOOT_HW LOW MEDIUM Update procedure failed when sending packet. P1: First byte percent, third and fourth byte Sequence Count, P2: Bytes written linux/devices/ploc/PlocSupvHelper.h bsp_q7s/core/CoreController.h
13628 0x353c SUPV_REPLY_SIZE_MISSMATCH LOW linux/devices/ploc/PlocSupvHelper.h

View File

@ -18,13 +18,17 @@ class EiveHookObject(TmTcCfgHookBase):
return get_eive_service_op_code_dict() return get_eive_service_op_code_dict()
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com_if import create_communication_interface_default from tmtccmd.config.com_if import (
create_com_interface_default,
create_com_interface_cfg_default,
)
return create_communication_interface_default( cfg = create_com_interface_cfg_default(
com_if_key=com_if_key, com_if_key=com_if_key,
json_cfg_path=self.json_cfg_path, json_cfg_path=self.json_cfg_path,
space_packet_ids=SPACE_PACKET_IDS, space_packet_ids=SPACE_PACKET_IDS,
) )
return create_com_interface_default(cfg)
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
from config.custom_mode_op import custom_mode_operation from config.custom_mode_op import custom_mode_operation

View File

@ -46,7 +46,6 @@
0x44330015;PLOC_MPSOC_HANDLER 0x44330015;PLOC_MPSOC_HANDLER
0x44330016;PLOC_SUPERVISOR_HANDLER 0x44330016;PLOC_SUPERVISOR_HANDLER
0x44330017;PLOC_SUPERVISOR_HELPER 0x44330017;PLOC_SUPERVISOR_HELPER
0x44330032;SCEX
0x444100A2;SOLAR_ARRAY_DEPL_HANDLER 0x444100A2;SOLAR_ARRAY_DEPL_HANDLER
0x444100A4;HEATER_HANDLER 0x444100A4;HEATER_HANDLER
0x44420004;TMP1075_HANDLER_1 0x44420004;TMP1075_HANDLER_1
@ -70,7 +69,6 @@
0x445300A3;SYRLINKS_HK_HANDLER 0x445300A3;SYRLINKS_HK_HANDLER
0x49000000;ARDUINO_COM_IF 0x49000000;ARDUINO_COM_IF
0x49010005;GPIO_IF 0x49010005;GPIO_IF
0x49010006;SCEX_UART_READER
0x49020004;SPI_MAIN_COM_IF 0x49020004;SPI_MAIN_COM_IF
0x49020005;SPI_RW_COM_IF 0x49020005;SPI_RW_COM_IF
0x49020006;SPI_RTD_COM_IF 0x49020006;SPI_RTD_COM_IF

1 0x00005060 P60DOCK_TEST_TASK
46 0x44330015 PLOC_MPSOC_HANDLER
47 0x44330016 PLOC_SUPERVISOR_HANDLER
48 0x44330017 PLOC_SUPERVISOR_HELPER
0x444100A2 SOLAR_ARRAY_DEPL_HANDLER
49 0x444100A4 0x444100A2 HEATER_HANDLER SOLAR_ARRAY_DEPL_HANDLER
50 0x44420004 0x444100A4 TMP1075_HANDLER_1 HEATER_HANDLER
51 0x44420005 0x44420004 TMP1075_HANDLER_2 TMP1075_HANDLER_1
69 0x49000000 0x445300A3 ARDUINO_COM_IF SYRLINKS_HK_HANDLER
70 0x49010005 0x49000000 GPIO_IF ARDUINO_COM_IF
71 0x49020004 0x49010005 SPI_MAIN_COM_IF GPIO_IF
0x49020005 SPI_RW_COM_IF
72 0x49020006 0x49020004 SPI_RTD_COM_IF SPI_MAIN_COM_IF
73 0x49030003 0x49020005 UART_COM_IF SPI_RW_COM_IF
74 0x49040002 0x49020006 I2C_COM_IF SPI_RTD_COM_IF

2
deps/spacepackets vendored

@ -1 +1 @@
Subproject commit 0fce08fa9a37b50096198ea563ed22fc0447e1b4 Subproject commit 13a54713ae283faf0b272dc6c1373ed459efb9b6

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit 598aecbcae8a2d194f17ea27fd6a274b114a83c0 Subproject commit 43a5e9ef3fa618c5296137dd76b4f25a1732b0f3

View File

@ -8,7 +8,7 @@ import struct
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_diag_command, generate_one_diag_command,
@ -78,7 +78,7 @@ def add_acu_cmds(defs: TmTcDefWrapper):
) )
def pack_acu_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Handling ACU command") q.add_log_cmd("Handling ACU command")
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd("ACU: Print channel stats") q.add_log_cmd("ACU: Print channel stats")
@ -143,7 +143,7 @@ class ACUTestProcedure:
off = False off = False
def pack_test_cmds(object_id: ObjectIdU32, q: QueueHelper): def pack_test_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper):
if ACUTestProcedure.all or ACUTestProcedure.reboot: if ACUTestProcedure.all or ACUTestProcedure.reboot:
q.add_log_cmd("ACU: Reboot") q.add_log_cmd("ACU: Reboot")
q.add_pus_tc(gs.pack_reboot_command(object_id)) q.add_pus_tc(gs.pack_reboot_command(object_id))

View File

@ -1,5 +1,5 @@
from config.object_ids import BPX_HANDLER_ID from config.object_ids import BPX_HANDLER_ID
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
@ -24,7 +24,7 @@ class BpxOpCodes:
REBOOT = ["4", "reboot"] REBOOT = ["4", "reboot"]
def pack_bpx_commands(q: QueueHelper, op_code: str): def pack_bpx_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in BpxOpCodes.HK: if op_code in BpxOpCodes.HK:
q.add_log_cmd("Requesting BPX battery HK set") q.add_log_cmd("Requesting BPX battery HK set")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET) sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET)

View File

@ -8,7 +8,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -32,7 +32,9 @@ class CommandIds:
UPDATE_ON_FALLING_EDGE = 8 UPDATE_ON_FALLING_EDGE = 8
def pack_ccsds_handler_test(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ccsds_handler_test(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
obyt = object_id.as_bytes obyt = object_id.as_bytes
q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}")
if op_code == "0": if op_code == "0":

View File

@ -2,7 +2,7 @@ import enum
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
@ -35,7 +35,7 @@ def add_gps_cmds(defs: TmTcDefWrapper):
) )
def pack_gps_command(object_id: bytes, q: QueueHelper, op_code: str): def pack_gps_command(object_id: bytes, q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.RESET_GNSS: if op_code in OpCodes.RESET_GNSS:
# TODO: This needs to be re-implemented # TODO: This needs to be re-implemented
LOGGER.warning("Reset pin handling needs to be re-implemented") LOGGER.warning("Reset pin handling needs to be re-implemented")

View File

@ -8,7 +8,7 @@ import enum
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.object_ids import get_object_ids from config.object_ids import get_object_ids
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util.obj_id import ObjectIdU32 from tmtccmd.util.obj_id import ObjectIdU32
from tmtccmd.tc.pus_201_fsfw_health import ( from tmtccmd.tc.pus_201_fsfw_health import (
pack_set_health_cmd_data, pack_set_health_cmd_data,
@ -66,7 +66,7 @@ def add_heater_cmds(defs: TmTcDefWrapper):
) )
def pack_heater_cmds(object_id: bytearray, op_code: str, q: QueueHelper): def pack_heater_cmds(object_id: bytearray, op_code: str, q: DefaultPusQueueHelper):
if op_code in OpCodes.HEATER_CMD: if op_code in OpCodes.HEATER_CMD:
q.add_log_cmd("Heater Switching") q.add_log_cmd("Heater Switching")
heater_number = prompt_heater() heater_number = prompt_heater()
@ -171,7 +171,7 @@ def prompt_heater() -> int:
def health_cmd( def health_cmd(
q: QueueHelper, q: DefaultPusQueueHelper,
heater_idx: int, heater_idx: int,
object_id: ObjectIdU32, object_id: ObjectIdU32,
health: FsfwHealth, health: FsfwHealth,

View File

@ -8,7 +8,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_diag_command, generate_one_diag_command,
@ -44,7 +44,7 @@ class ImtqActionIds:
read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D]) read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D])
def pack_imtq_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd( q.add_log_cmd(
f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}" f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}"
) )

View File

@ -5,7 +5,7 @@
@author J. Meier @author J. Meier
@date 13.12.2020 @date 13.12.2020
""" """
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER from config.object_ids import P60_DOCK_HANDLER
@ -82,7 +82,7 @@ class P60DockHkTable:
wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size)
def pack_p60dock_cmds(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
objb = object_id.as_bytes objb = object_id.as_bytes
if op_code in P60OpCodes.STACK_3V3_ON: if op_code in P60OpCodes.STACK_3V3_ON:
q.add_log_cmd(Info.STACK_3V3_ON) q.add_log_cmd(Info.STACK_3V3_ON)

View File

@ -6,7 +6,7 @@
@date 22.11.2021 @date 22.11.2021
""" """
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
class CommandIds: class CommandIds:
@ -16,7 +16,9 @@ class CommandIds:
PRINT_PDEC_MON = bytearray([0x0, 0x0, 0x0, 0x1]) PRINT_PDEC_MON = bytearray([0x0, 0x0, 0x0, 0x1])
def pack_pdec_handler_test(object_id: bytearray, q: QueueHelper, op_code: str): def pack_pdec_handler_test(
object_id: bytearray, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Testing PDEC handler with object id: {object_id.hex()}") q.add_log_cmd(f"Testing PDEC handler with object id: {object_id.hex()}")
if op_code == "0": if op_code == "0":
q.add_log_cmd("PDEC Handler: Print CLCW") q.add_log_cmd("PDEC Handler: Print CLCW")

View File

@ -5,7 +5,7 @@
""" """
import gomspace.gomspace_common as gs import gomspace.gomspace_common as gs
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command, generate_one_hk_command,
make_sid, make_sid,
@ -55,7 +55,7 @@ class PDU1TestProcedure:
turn_channel_3_off = False turn_channel_3_off = False
def pack_pdu1_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Commanding PDU1") q.add_log_cmd("Commanding PDU1")
objb = object_id.as_bytes objb = object_id.as_bytes
if op_code == Pdu1OpCodes.TCS_BOARD_ON.value: if op_code == Pdu1OpCodes.TCS_BOARD_ON.value:

View File

@ -6,7 +6,7 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command, generate_one_hk_command,
generate_one_diag_command, generate_one_diag_command,
@ -65,7 +65,7 @@ class PDU2TestProcedure:
request_hk_table = False request_hk_table = False
def pack_pdu2_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_pdu2_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Testing PDU2") q.add_log_cmd("Testing PDU2")
objb = object_id.as_bytes objb = object_id.as_bytes
if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value: if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value:

View File

@ -9,7 +9,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -17,7 +17,9 @@ class ActionIds:
DUMP_MRAM = 1 DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ploc_memory_dumper_cmd(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}" f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}"
) )

View File

@ -11,7 +11,7 @@ import enum
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from utility.input_helper import InputHelper from utility.input_helper import InputHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
@ -66,7 +66,9 @@ class PlocReplyIds(enum.IntEnum):
TM_CAM_CMD_RPT = 19 TM_CAM_CMD_RPT = 19
def pack_ploc_mpsoc_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ploc_mpsoc_commands(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
) )

View File

@ -10,7 +10,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from utility.input_helper import InputHelper from utility.input_helper import InputHelper
@ -106,7 +106,9 @@ class SupvHkIds:
BOOT_STATUS_REPORT = 53 BOOT_STATUS_REPORT = 53
def pack_ploc_supv_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ploc_supv_commands(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}")
obyt = object_id.as_bytes obyt = object_id.as_bytes
if op_code == "0": if op_code == "0":

View File

@ -7,7 +7,7 @@ from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper from tmtccmd.config import TmTcDefWrapper
from tmtccmd.config.tmtc import OpCodeEntry from tmtccmd.config.tmtc import OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_diag_command, generate_one_diag_command,
@ -149,7 +149,7 @@ def add_pl_pcdu_cmds(defs: TmTcDefWrapper):
defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce) defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce)
def pack_pl_pcdu_commands(q: QueueHelper, op_code: str): def pack_pl_pcdu_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.SWITCH_ON: if op_code in OpCodes.SWITCH_ON:
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Modes.ON, submode=0) pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Modes.ON, submode=0)
if op_code in OpCodes.SWITCH_OFF: if op_code in OpCodes.SWITCH_OFF:
@ -215,7 +215,7 @@ def pack_pl_pcdu_commands(q: QueueHelper, op_code: str):
) )
def hpa_on_procedure(q: QueueHelper): def hpa_on_procedure(q: DefaultPusQueueHelper):
delay_dro_to_x8 = request_wait_time() delay_dro_to_x8 = request_wait_time()
if delay_dro_to_x8 is None: if delay_dro_to_x8 is None:
delay_dro_to_x8 = 900 delay_dro_to_x8 = 900
@ -389,7 +389,7 @@ def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int:
) )
def pack_wait_time_cmd(q: QueueHelper, param_id: int, print_str: str): def pack_wait_time_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
wait_time = request_wait_time() wait_time = request_wait_time()
q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}") q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}")
if wait_time is None: if wait_time is None:
@ -403,7 +403,7 @@ def pack_wait_time_cmd(q: QueueHelper, param_id: int, print_str: str):
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data)) q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data))
def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str): def pack_failure_injection_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Inserting {print_str} error") q.add_log_cmd(f"Inserting {print_str} error")
param_data = pack_boolean_parameter_app_data( param_data = pack_boolean_parameter_app_data(
object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True
@ -411,7 +411,9 @@ def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str):
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data)) q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data))
def pack_pl_pcdu_mode_cmd(q: QueueHelper, info: str, mode: Modes, submode: int): def pack_pl_pcdu_mode_cmd(
q: DefaultPusQueueHelper, info: str, mode: Modes, submode: int
):
q.add_log_cmd(info) q.add_log_cmd(info)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode) mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
q.add_pus_tc( q.add_pus_tc(

View File

@ -12,7 +12,7 @@ from config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data, Modes from pus_tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -61,7 +61,9 @@ def add_rad_sens_cmds(defs: TmTcDefWrapper):
) )
def pack_rad_sensor_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_rad_sensor_test_into(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}") q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}")
if op_code in OpCodes.ON: if op_code in OpCodes.ON:
@ -87,7 +89,9 @@ def pack_rad_sensor_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: s
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def rad_sensor_mode_cmd(object_id: ObjectIdU32, mode: Modes, info: str, q: QueueHelper): def rad_sensor_mode_cmd(
object_id: ObjectIdU32, mode: Modes, info: str, q: DefaultPusQueueHelper
):
q.add_log_cmd(f"Rad sensor: {info}") q.add_log_cmd(f"Rad sensor: {info}")
mode_data = pack_mode_data(object_id.as_bytes, mode, 0) mode_data = pack_mode_data(object_id.as_bytes, mode, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -7,7 +7,7 @@
import struct import struct
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command, generate_one_hk_command,
generate_one_diag_command, generate_one_diag_command,
@ -115,7 +115,7 @@ def add_rw_cmds(defs: TmTcDefWrapper):
def pack_single_rw_test_into( def pack_single_rw_test_into(
object_id: bytes, rw_idx: int, q: QueueHelper, op_code: str object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str
): ):
if op_code in OpCodesDevs.SPEED: if op_code in OpCodesDevs.SPEED:
speed = int(input("Specify speed [0.1 RPM]: ")) speed = int(input("Specify speed [0.1 RPM]: "))
@ -154,7 +154,7 @@ def pack_single_rw_test_into(
) )
def pack_rw_ass_cmds(q: QueueHelper, object_id: bytes, op_code: str): def pack_rw_ass_cmds(q: DefaultPusQueueHelper, object_id: bytes, op_code: str):
if op_code in OpCodesAss.OFF: if op_code in OpCodesAss.OFF:
data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0) data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0)
q.add_pus_tc( q.add_pus_tc(

View File

@ -5,7 +5,7 @@ from pus_tc.devs.pdec_handler import CommandIds
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices
import config.object_ids as oids import config.object_ids as oids
@ -55,7 +55,9 @@ def specify_rtd_cmds(defs: TmTcDefWrapper):
) )
def pack_rtd_commands(op_code: str, object_id: Optional[ObjectIdU32], q: QueueHelper): def pack_rtd_commands(
op_code: str, object_id: Optional[ObjectIdU32], q: DefaultPusQueueHelper
):
if object_id is not None and object_id not in RTD_IDS: if object_id is not None and object_id not in RTD_IDS:
print("Specified object ID not a valid RTD ID") print("Specified object ID not a valid RTD ID")
object_id = None object_id = None

View File

@ -2,7 +2,7 @@ import enum
import json import json
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.config import OpCodeEntry, TmTcDefWrapper from tmtccmd.config import OpCodeEntry, TmTcDefWrapper
from config.object_ids import SCEX_HANDLER_ID from config.object_ids import SCEX_HANDLER_ID
@ -56,13 +56,11 @@ def add_scex_cmds(defs: TmTcDefWrapper):
oce.add(keys=OpCodes.FRAM, info=Info.FRAM) oce.add(keys=OpCodes.FRAM, info=Info.FRAM)
defs.add_service( defs.add_service(
name=CustomServiceList.SCEX.value, name=CustomServiceList.SCEX.value, info="SCEX Device", op_code_entry=oce
info="SCEX Device",
op_code_entry=oce
) )
def pack_scex_cmds(q: QueueHelper, op_code: str): def pack_scex_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.PING: if op_code in OpCodes.PING:
q.add_log_cmd(Info.PING) q.add_log_cmd(Info.PING)
app_data = bytes([0]) app_data = bytes([0])
@ -70,17 +68,23 @@ def pack_scex_cmds(q: QueueHelper, op_code: str):
if op_code in OpCodes.ION_CMD: if op_code in OpCodes.ION_CMD:
q.add_log_cmd(Info.ION_CMD) q.add_log_cmd(Info.ION_CMD)
app_data = bytes([0]) app_data = bytes([0])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.ION_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.ION_CMD, app_data)
)
if op_code in OpCodes.TEMP_CMD: if op_code in OpCodes.TEMP_CMD:
q.add_log_cmd(Info.TEMP_CMD) q.add_log_cmd(Info.TEMP_CMD)
app_data = bytes([0]) app_data = bytes([0])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.TEMP_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.TEMP_CMD, app_data)
)
if op_code in OpCodes.EXP_STATUS_CMD: if op_code in OpCodes.EXP_STATUS_CMD:
q.add_log_cmd(Info.EXP_STATUS_CMD) q.add_log_cmd(Info.EXP_STATUS_CMD)
app_data = bytes([0]) app_data = bytes([0])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.EXP_STATUS_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.EXP_STATUS_CMD, app_data)
)
# one cell # one cell
if op_code in OpCodes.ONE_CELLS_CMD: if op_code in OpCodes.ONE_CELLS_CMD:
@ -126,7 +130,9 @@ def pack_scex_cmds(q: QueueHelper, op_code: str):
app_data.append(dac_weight2[cn]) app_data.append(dac_weight2[cn])
app_data.append(dac_weight3[cn]) app_data.append(dac_weight3[cn])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.ONE_CELLS_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.ONE_CELLS_CMD, app_data)
)
if op_code in OpCodes.ALL_CELLS_CMD: if op_code in OpCodes.ALL_CELLS_CMD:
q.add_log_cmd(Info.ALL_CELLS_CMD) q.add_log_cmd(Info.ALL_CELLS_CMD)
@ -156,7 +162,9 @@ def pack_scex_cmds(q: QueueHelper, op_code: str):
app_data.append(dac_weight2[cn]) app_data.append(dac_weight2[cn])
app_data.append(dac_weight3[cn]) app_data.append(dac_weight3[cn])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.ALL_CELLS_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.ALL_CELLS_CMD, app_data)
)
if op_code in OpCodes.FRAM: if op_code in OpCodes.FRAM:
q.add_log_cmd(Info.FRAM) q.add_log_cmd(Info.FRAM)

View File

@ -6,14 +6,16 @@
@date 15.02.2021 @date 15.02.2021
""" """
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
class ActionIds: class ActionIds:
DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5]) DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5])
def pack_solar_array_deployment_test_into(object_id: bytearray, q: QueueHelper): def pack_solar_array_deployment_test_into(
object_id: bytearray, q: DefaultPusQueueHelper
):
q.add_log_cmd("Testing S/A Deployment") q.add_log_cmd("Testing S/A Deployment")
command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS

View File

@ -11,7 +11,7 @@ from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from utility.input_helper import InputHelper from utility.input_helper import InputHelper
@ -150,7 +150,9 @@ class Submode:
FIRMWARE = 2 FIRMWARE = 2
def pack_star_tracker_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_star_tracker_commands(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Generate command for star tracker with object id: {object_id.as_hex_string}" f"Generate command for star tracker with object id: {object_id.as_hex_string}"
) )

View File

@ -11,7 +11,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -24,7 +24,9 @@ class ImagePathDefs:
uploadFile = "/mnt/sd0/startracker/gemma.bin" uploadFile = "/mnt/sd0/startracker/gemma.bin"
def pack_str_img_helper_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_str_img_helper_command(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Testing star tracker image helper object id: {object_id.as_hex_string}" f"Testing star tracker image helper object id: {object_id.as_hex_string}"
) )

View File

@ -5,7 +5,7 @@
@author J. Meier @author J. Meier
@date 13.12.2020 @date 13.12.2020
""" """
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
@ -37,7 +37,9 @@ class CommandIds:
DISABLE_DEBUG = 21 DISABLE_DEBUG = 21
def pack_syrlinks_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_syrlinks_command(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
obyt = object_id.as_bytes obyt = object_id.as_bytes
q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}")
if op_code == "0": if op_code == "0":

View File

@ -7,7 +7,7 @@
""" """
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data from pus_tc.service_200_mode import pack_mode_data
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -33,7 +33,9 @@ class Tmp1075ActionIds:
start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02]) start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02])
def pack_tmp1075_test_into(object_id: ObjectIdU32, op_code: str, q: QueueHelper): def pack_tmp1075_test_into(
object_id: ObjectIdU32, op_code: str, q: DefaultPusQueueHelper
):
q.add_log_cmd( q.add_log_cmd(
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}" f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}"
) )

View File

@ -4,11 +4,14 @@ from typing import cast
from pus_tc.devs.rtd import pack_rtd_commands from pus_tc.devs.rtd import pack_rtd_commands
from pus_tc.devs.scex import pack_scex_cmds from pus_tc.devs.scex import pack_scex_cmds
from pus_tc.system.controllers import pack_cmd_ctrl_to_prompted_mode, get_object_from_op_code from pus_tc.system.controllers import (
pack_cmd_ctrl_to_prompted_mode,
get_object_from_op_code,
)
from tmtccmd import DefaultProcedureInfo from tmtccmd import DefaultProcedureInfo
from tmtccmd.config import CoreServiceList from tmtccmd.config import CoreServiceList
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import FeedWrapper from tmtccmd.tc import FeedWrapper, DefaultPusQueueHelper
from tmtccmd.tc.pus_5_event import ( from tmtccmd.tc.pus_5_event import (
pack_generic_service_5_test_into, pack_generic_service_5_test_into,
) )
@ -73,8 +76,9 @@ from tmtccmd.util import ObjectIdU32
LOGGER = get_console_logger() LOGGER = get_console_logger()
def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper, gui: bool): def handle_default_procedure(
queue_helper = wrapper.queue_helper info: DefaultProcedureInfo, queue_helper: DefaultPusQueueHelper, gui: bool
):
service = info.service service = info.service
op_code = info.op_code op_code = info.op_code
obj_id_man = get_object_ids() obj_id_man = get_object_ids()
@ -199,9 +203,7 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper, g
if service == CustomServiceList.TIME.value: if service == CustomServiceList.TIME.value:
return pack_set_current_time_ascii_command(q=queue_helper) return pack_set_current_time_ascii_command(q=queue_helper)
if service == CustomServiceList.RW_ASSEMBLY.value: if service == CustomServiceList.RW_ASSEMBLY.value:
return pack_rw_ass_cmds( return pack_rw_ass_cmds(q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code)
q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code
)
if service == CustomServiceList.CONTROLLERS.value: if service == CustomServiceList.CONTROLLERS.value:
return pack_cmd_ctrl_to_prompted_mode( return pack_cmd_ctrl_to_prompted_mode(
q=queue_helper, object_id=get_object_from_op_code(op_code), gui=gui q=queue_helper, object_id=get_object_from_op_code(op_code), gui=gui

View File

@ -6,14 +6,14 @@
@date 02.05.2020 @date 02.05.2020
""" """
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from config.object_ids import TEST_DEVICE_ID from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
def pack_service200_test_into(q: QueueHelper): def pack_service200_test_into(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Service 200") q.add_log_cmd("Testing Service 200")
# Object ID: Dummy Device # Object ID: Dummy Device
obj_id = TEST_DEVICE_OBJ_ID obj_id = TEST_DEVICE_OBJ_ID

View File

@ -1,6 +1,6 @@
import enum import enum
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
@ -30,7 +30,7 @@ class DualSideSubmodes(enum.IntEnum):
DUAL_SIDE = 2 DUAL_SIDE = 2
def pack_acs_command(q: QueueHelper, op_code: str): def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
if op_code in AcsOpCodes.ACS_ASS_A_SIDE: if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
command_mode( command_mode(
object_id=ACS_BOARD_ASS_ID, object_id=ACS_BOARD_ASS_ID,
@ -89,7 +89,7 @@ def pack_acs_command(q: QueueHelper, op_code: str):
) )
def pack_sus_cmds(q: QueueHelper, op_code: str): def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE: if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
command_mode( command_mode(
object_id=SUS_BOARD_ASS_ID, object_id=SUS_BOARD_ASS_ID,

View File

@ -1,7 +1,7 @@
from typing import Union from typing import Union
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
@ -9,7 +9,7 @@ def command_mode(
object_id: bytes, object_id: bytes,
mode: Union[int, Modes], mode: Union[int, Modes],
submode: int, submode: int,
q: QueueHelper, q: DefaultPusQueueHelper,
info: str, info: str,
): ):
q.add_log_cmd(info) q.add_log_cmd(info)

View File

@ -1,4 +1,4 @@
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -18,7 +18,9 @@ class Info:
CORE_CONTROLLER = "ACS controller" CORE_CONTROLLER = "ACS controller"
def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectIdU32, gui: bool): def pack_cmd_ctrl_to_prompted_mode(
q: DefaultPusQueueHelper, object_id: ObjectIdU32, gui: bool
):
param_list = [ param_list = [
{"name": "Mode", "defaultValue": "2"}, {"name": "Mode", "defaultValue": "2"},
{"name": "Submode", "defaultValue": "0"}, {"name": "Submode", "defaultValue": "0"},
@ -41,7 +43,7 @@ def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectIdU32, gui:
) )
def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_off(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
command_mode( command_mode(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
mode=Modes.OFF, mode=Modes.OFF,
@ -51,7 +53,7 @@ def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectIdU32):
) )
def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
command_mode( command_mode(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
mode=Modes.ON, mode=Modes.ON,
@ -61,7 +63,7 @@ def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectIdU32):
) )
def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_nml(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
command_mode( command_mode(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
mode=Modes.NORMAL, mode=Modes.NORMAL,

View File

@ -3,7 +3,7 @@ import enum
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper from tmtccmd.config import TmTcDefWrapper
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
@ -112,7 +112,7 @@ def add_core_controller_definitions(defs: TmTcDefWrapper):
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
def pack_core_commands(q: QueueHelper, op_code: str): def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.REBOOT_XSC: if op_code in OpCodes.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params() reboot_self, chip_select, copy_select = determine_reboot_params()
perform_reboot_cmd( perform_reboot_cmd(
@ -231,7 +231,7 @@ def determine_reboot_params() -> (bool, Chip, Copy):
def perform_reboot_cmd( def perform_reboot_cmd(
q: QueueHelper, q: DefaultPusQueueHelper,
reboot_self: bool, reboot_self: bool,
chip: Chip = Chip.NONE, chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE, copy: Copy = Copy.NONE,

View File

@ -9,7 +9,7 @@ from config.object_ids import get_object_ids
from pus_tc.system.tcs import pack_tcs_sys_commands from pus_tc.system.tcs import pack_tcs_sys_commands
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_11_tc_sched import ( from tmtccmd.tc.pus_11_tc_sched import (
generate_time_tagged_cmd, generate_time_tagged_cmd,
generate_enable_tc_sched_cmd, generate_enable_tc_sched_cmd,
@ -122,7 +122,7 @@ class GenericHkListeningCfg:
return GenericHkListeningCfg(False, False, False) return GenericHkListeningCfg(False, False, False)
def generic_print(q: QueueHelper, info: dict): def generic_print(q: DefaultPusQueueHelper, info: dict):
q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})") q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})")
@ -138,7 +138,7 @@ def add_proc_cmds(defs: TmTcDefWrapper):
def pack_generic_hk_listening_cmds( def pack_generic_hk_listening_cmds(
q: QueueHelper, q: DefaultPusQueueHelper,
proc_key: str, proc_key: str,
sid_list: list[bytearray], sid_list: list[bytearray],
diag_list: list[bool], diag_list: list[bool],
@ -205,7 +205,7 @@ def pack_generic_hk_listening_cmds(
diag_list.clear() diag_list.clear()
def pack_proc_commands(q: QueueHelper, op_code: str): def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str):
sid_list = [] sid_list = []
obj_id_dict = get_object_ids() obj_id_dict = get_object_ids()
if op_code in OpCodes.RESET_SCHED: if op_code in OpCodes.RESET_SCHED:
@ -695,7 +695,7 @@ def pack_proc_commands(q: QueueHelper, op_code: str):
def enable_listen_to_hk_for_x_seconds( def enable_listen_to_hk_for_x_seconds(
q: QueueHelper, q: DefaultPusQueueHelper,
diag: bool, diag: bool,
device: str, device: str,
sid: bytes, sid: bytes,
@ -710,7 +710,7 @@ def enable_listen_to_hk_for_x_seconds(
def gen_disable_listen_to_hk_for_x_seconds( def gen_disable_listen_to_hk_for_x_seconds(
q: QueueHelper, q: DefaultPusQueueHelper,
diag: bool, diag: bool,
device: str, device: str,
sid: bytes, sid: bytes,
@ -720,7 +720,7 @@ def gen_disable_listen_to_hk_for_x_seconds(
def activate_mgts_alternately( def activate_mgts_alternately(
q: QueueHelper, q: DefaultPusQueueHelper,
): ):
q.add_pus_tc( q.add_pus_tc(
@ -790,7 +790,9 @@ def activate_mgts_alternately(
q.add_wait_seconds(40.0) q.add_wait_seconds(40.0)
def rw_speed_cmd_single(q: QueueHelper, oid: bytes, speed: int, ramp_time: int): def rw_speed_cmd_single(
q: DefaultPusQueueHelper, oid: bytes, speed: int, ramp_time: int
):
q.add_pus_tc( q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time) pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
) )
@ -799,7 +801,7 @@ def rw_speed_cmd_single(q: QueueHelper, oid: bytes, speed: int, ramp_time: int):
def rw_speed_up_cmd_consec( def rw_speed_up_cmd_consec(
q: QueueHelper, obids: List[bytes], speed: int, ramp_time: int q: DefaultPusQueueHelper, obids: List[bytes], speed: int, ramp_time: int
): ):
for oid in obids: for oid in obids:
q.add_pus_tc( q.add_pus_tc(
@ -807,7 +809,9 @@ def rw_speed_up_cmd_consec(
) )
def rw_speed_down_cmd_consec(q: QueueHelper, obids: List[bytes], ramp_time: int): def rw_speed_down_cmd_consec(
q: DefaultPusQueueHelper, obids: List[bytes], ramp_time: int
):
for oid in obids: for oid in obids:
q.add_pus_tc( q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time) pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)
@ -815,7 +819,7 @@ def rw_speed_down_cmd_consec(q: QueueHelper, obids: List[bytes], ramp_time: int)
def activate_all_rws_in_sequence( def activate_all_rws_in_sequence(
q: QueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int q: DefaultPusQueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int
): ):
new_ssc = init_ssc new_ssc = init_ssc
# RW1 speed cmd # RW1 speed cmd
@ -831,7 +835,7 @@ def activate_all_rws_in_sequence(
return new_ssc return new_ssc
def activate_all_rws_two_consecutively(q: QueueHelper): def activate_all_rws_two_consecutively(q: DefaultPusQueueHelper):
# RW1+3 speed cmd # RW1+3 speed cmd
q.add_wait_seconds(5.0) q.add_wait_seconds(5.0)
rw_speed_up_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], -20000, 10000) rw_speed_up_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], -20000, 10000)

View File

@ -1,4 +1,4 @@
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from .common import command_mode from .common import command_mode
@ -15,7 +15,7 @@ class Info:
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off" TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
def pack_tcs_sys_commands(q: QueueHelper, op_code: str): def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL: if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
command_mode( command_mode(
object_id=TCS_BOARD_ASS_ID, object_id=TCS_BOARD_ASS_ID,

View File

@ -3,7 +3,7 @@ from datetime import datetime
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -16,7 +16,7 @@ class Info:
SET_CURRENT_TIME = "Setting current time in ASCII format" SET_CURRENT_TIME = "Setting current time in ASCII format"
def pack_set_current_time_ascii_command(q: QueueHelper): def pack_set_current_time_ascii_command(q: DefaultPusQueueHelper):
time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0" time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0"
current_time_ascii = time_test_current_time.encode("ascii") current_time_ascii = time_test_current_time.encode("ascii")
LOGGER.info(f"Current time in ASCII format: {current_time_ascii}") LOGGER.info(f"Current time in ASCII format: {current_time_ascii}")

View File

@ -2,7 +2,11 @@ import enum
import struct import struct
from pus_tm.defs import PrintWrapper from pus_tm.defs import PrintWrapper
from pus_tm.tcp_server_objects import * from pus_tm.tcp_server_objects import (
tcp_server_sensor_temperatures,
tcp_server_device_temperatures,
)
from pus_tm.tm_tcp_server import TmTcpServer
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
@ -16,6 +20,10 @@ class SetIds(enum.IntEnum):
def handle_thermal_controller_hk_data( def handle_thermal_controller_hk_data(
object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes object_id: ObjectIdU32, printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes
): ):
if tcp_server_sensor_temperatures is None:
tcp_server_sensor_temperatures = TmTcpServer("localhost", 7305)
if tcp_server_device_temperatures is None:
tcp_server_device_temperatures = TmTcpServer("localhost", 7306)
if set_id == SetIds.SENSOR_TEMPERATURE_SET: if set_id == SetIds.SENSOR_TEMPERATURE_SET:
pw = PrintWrapper(printer) pw = PrintWrapper(printer)
pw.dlog("Received sensor temperature data") pw.dlog("Received sensor temperature data")

View File

@ -1,4 +1,6 @@
from typing import Optional
from pus_tm.tm_tcp_server import TmTcpServer from pus_tm.tm_tcp_server import TmTcpServer
tcp_server_sensor_temperatures = TmTcpServer("localhost", 7305) tcp_server_sensor_temperatures: Optional[TmTcpServer] = None
tcp_server_device_temperatures = TmTcpServer("localhost", 7306) tcp_server_device_temperatures: Optional[TmTcpServer] = None

View File

@ -4,6 +4,8 @@ import sys
import time import time
import traceback import traceback
from tmtccmd.tc.handler import SendCbParams
try: try:
import spacepackets import spacepackets
except ImportError as error: except ImportError as error:
@ -26,7 +28,6 @@ except ImportError as error:
from spacepackets.ecss import PusVerificator from spacepackets.ecss import PusVerificator
from tmtccmd import get_console_logger, TcHandlerBase, BackendBase from tmtccmd import get_console_logger, TcHandlerBase, BackendBase
from tmtccmd.com_if import ComInterface
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
@ -43,8 +44,8 @@ from tmtccmd.tc import (
ProcedureHelper, ProcedureHelper,
FeedWrapper, FeedWrapper,
TcProcedureType, TcProcedureType,
QueueEntryHelper,
TcQueueEntryType, TcQueueEntryType,
DefaultPusQueueHelper,
) )
from tmtccmd.config import default_json_path, SetupWrapper from tmtccmd.config import default_json_path, SetupWrapper
from tmtccmd.config.args import ( from tmtccmd.config.args import (
@ -88,7 +89,7 @@ class TcHandler(TcHandlerBase):
pus_verificator: PusVerificator, pus_verificator: PusVerificator,
file_logger: logging.Logger, file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
gui: bool gui: bool,
): ):
super().__init__() super().__init__()
self.seq_count_provider = seq_count_provider self.seq_count_provider = seq_count_provider
@ -96,12 +97,22 @@ class TcHandler(TcHandlerBase):
self.file_logger = file_logger self.file_logger = file_logger
self.raw_logger = raw_logger self.raw_logger = raw_logger
self.gui = gui self.gui = gui
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None,
pus_apid=PUS_APID,
seq_cnt_provider=seq_count_provider,
pus_verificator=pus_verificator,
)
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper): def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT: if info.proc_type == TcProcedureType.DEFAULT:
handle_default_procedure(info.to_def_procedure(), wrapper, self.gui) handle_default_procedure(
info.to_def_procedure(), self.queue_helper, self.gui
)
def send_cb(self, entry_helper: QueueEntryHelper, com_if: ComInterface): def send_cb(self, send_params: SendCbParams):
entry_helper = send_params.entry
if entry_helper.is_tc: if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC: if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry() pus_tc_wrapper = entry_helper.to_pus_tc_entry()
@ -118,7 +129,7 @@ class TcHandler(TcHandlerBase):
self.file_logger.info( self.file_logger.info(
f"{get_current_time_string(True)}: {tc_info_string}" f"{get_current_time_string(True)}: {tc_info_string}"
) )
com_if.send(raw_tc) send_params.com_if.send(raw_tc)
elif entry_helper.entry_type == TcQueueEntryType.LOG: elif entry_helper.entry_type == TcQueueEntryType.LOG:
log_entry = entry_helper.to_log_entry() log_entry = entry_helper.to_log_entry()
LOGGER.info(log_entry.log_str) LOGGER.info(log_entry.log_str)
@ -150,7 +161,7 @@ def setup_tmtc(
verificator: PusVerificator, verificator: PusVerificator,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
gui: bool gui: bool,
) -> (CcsdsTmHandler, TcHandler): ) -> (CcsdsTmHandler, TcHandler):
verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger) verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger)
pus_handler = PusHandler(verification_wrapper, printer, raw_logger) pus_handler = PusHandler(verification_wrapper, printer, raw_logger)
@ -162,7 +173,7 @@ def setup_tmtc(
pus_verificator=verificator, pus_verificator=verificator,
file_logger=printer.file_logger, file_logger=printer.file_logger,
raw_logger=raw_logger, raw_logger=raw_logger,
gui=gui gui=gui,
) )
return ccsds_handler, tc_handler return ccsds_handler, tc_handler