update dependencies

This commit is contained in:
Robin Müller 2022-08-08 16:32:18 +02:00
parent 6efd7a4ef1
commit 76da5753f6
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
38 changed files with 170 additions and 112 deletions

View File

@ -15,7 +15,7 @@
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s 17 -o 0 -d 3" /> <option name="PARAMETERS" value="-s 17 -o 0 -d 3" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" /> <option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />

View File

@ -18,13 +18,17 @@ class EiveHookObject(TmTcCfgHookBase):
return get_eive_service_op_code_dict() return get_eive_service_op_code_dict()
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com_if import create_communication_interface_default from tmtccmd.config.com_if import (
create_com_interface_default,
create_com_interface_cfg_default,
)
return create_communication_interface_default( cfg = create_com_interface_cfg_default(
com_if_key=com_if_key, com_if_key=com_if_key,
json_cfg_path=self.json_cfg_path, json_cfg_path=self.json_cfg_path,
space_packet_ids=SPACE_PACKET_IDS, space_packet_ids=SPACE_PACKET_IDS,
) )
return create_com_interface_default(cfg)
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int): def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
from config.custom_mode_op import custom_mode_operation from config.custom_mode_op import custom_mode_operation

2
deps/spacepackets vendored

@ -1 +1 @@
Subproject commit d838cbdd614a69d1f8c7d6126206f44fd8b44aa7 Subproject commit 13a54713ae283faf0b272dc6c1373ed459efb9b6

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit 8ac7e1793178dfcd000ccdc2cfee3e90aff53a0d Subproject commit a8dda1113cd1f70e984915cff920a261de942adf

View File

@ -167,7 +167,7 @@ def add_bpx_cmd_definitions(defs: TmTcDefWrapper):
) )
oce.add(keys=BpxOpCodes.REBOOT, info="Reboot Command") oce.add(keys=BpxOpCodes.REBOOT, info="Reboot Command")
defs.add_service( defs.add_service(
name=CustomServiceList.BPX_BATTERY.value, name=CustomServiceList.BPX_BATTERY,
info="BPX Battery Handler", info="BPX Battery Handler",
op_code_entry=oce, op_code_entry=oce,
) )

View File

@ -8,7 +8,7 @@ import struct
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_diag_command, generate_one_diag_command,
@ -78,7 +78,7 @@ def add_acu_cmds(defs: TmTcDefWrapper):
) )
def pack_acu_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Handling ACU command") q.add_log_cmd("Handling ACU command")
if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I: if op_code in GomspaceOpCodes.PRINT_SWITCH_V_I:
q.add_log_cmd("ACU: Print channel stats") q.add_log_cmd("ACU: Print channel stats")
@ -143,7 +143,7 @@ class ACUTestProcedure:
off = False off = False
def pack_test_cmds(object_id: ObjectIdU32, q: QueueHelper): def pack_test_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper):
if ACUTestProcedure.all or ACUTestProcedure.reboot: if ACUTestProcedure.all or ACUTestProcedure.reboot:
q.add_log_cmd("ACU: Reboot") q.add_log_cmd("ACU: Reboot")
q.add_pus_tc(gs.pack_reboot_command(object_id)) q.add_pus_tc(gs.pack_reboot_command(object_id))

View File

@ -1,5 +1,5 @@
from config.object_ids import BPX_HANDLER_ID from config.object_ids import BPX_HANDLER_ID
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
@ -24,7 +24,7 @@ class BpxOpCodes:
REBOOT = ["4", "reboot"] REBOOT = ["4", "reboot"]
def pack_bpx_commands(q: QueueHelper, op_code: str): def pack_bpx_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in BpxOpCodes.HK: if op_code in BpxOpCodes.HK:
q.add_log_cmd("Requesting BPX battery HK set") q.add_log_cmd("Requesting BPX battery HK set")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET) sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET)

View File

@ -8,7 +8,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -32,7 +32,9 @@ class CommandIds:
UPDATE_ON_FALLING_EDGE = 8 UPDATE_ON_FALLING_EDGE = 8
def pack_ccsds_handler_test(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ccsds_handler_test(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
obyt = object_id.as_bytes obyt = object_id.as_bytes
q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing CCSDS handler with object id: {object_id.as_hex_string}")
if op_code == "0": if op_code == "0":

View File

@ -2,7 +2,7 @@ import enum
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
@ -35,7 +35,7 @@ def add_gps_cmds(defs: TmTcDefWrapper):
) )
def pack_gps_command(object_id: bytes, q: QueueHelper, op_code: str): def pack_gps_command(object_id: bytes, q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.RESET_GNSS: if op_code in OpCodes.RESET_GNSS:
# TODO: This needs to be re-implemented # TODO: This needs to be re-implemented
LOGGER.warning("Reset pin handling needs to be re-implemented") LOGGER.warning("Reset pin handling needs to be re-implemented")

View File

@ -8,7 +8,7 @@ import enum
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from config.object_ids import get_object_ids from config.object_ids import get_object_ids
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util.obj_id import ObjectIdU32 from tmtccmd.util.obj_id import ObjectIdU32
from tmtccmd.tc.pus_201_fsfw_health import ( from tmtccmd.tc.pus_201_fsfw_health import (
pack_set_health_cmd_data, pack_set_health_cmd_data,
@ -66,7 +66,7 @@ def add_heater_cmds(defs: TmTcDefWrapper):
) )
def pack_heater_cmds(object_id: bytearray, op_code: str, q: QueueHelper): def pack_heater_cmds(object_id: bytearray, op_code: str, q: DefaultPusQueueHelper):
if op_code in OpCodes.HEATER_CMD: if op_code in OpCodes.HEATER_CMD:
q.add_log_cmd("Heater Switching") q.add_log_cmd("Heater Switching")
heater_number = prompt_heater() heater_number = prompt_heater()
@ -171,7 +171,7 @@ def prompt_heater() -> int:
def health_cmd( def health_cmd(
q: QueueHelper, q: DefaultPusQueueHelper,
heater_idx: int, heater_idx: int,
object_id: ObjectIdU32, object_id: ObjectIdU32,
health: FsfwHealth, health: FsfwHealth,

View File

@ -8,7 +8,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_diag_command, generate_one_diag_command,
@ -44,7 +44,7 @@ class ImtqActionIds:
read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D]) read_self_test_results = bytearray([0x0, 0x0, 0x0, 0x0D])
def pack_imtq_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_imtq_test_into(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd( q.add_log_cmd(
f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}" f"Testing ISIS IMTQ handler with object id: {object_id.as_hex_string}"
) )

View File

@ -5,7 +5,7 @@
@author J. Meier @author J. Meier
@date 13.12.2020 @date 13.12.2020
""" """
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from gomspace.gomspace_common import * from gomspace.gomspace_common import *
from config.object_ids import P60_DOCK_HANDLER from config.object_ids import P60_DOCK_HANDLER
@ -82,7 +82,7 @@ class P60DockHkTable:
wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size) wdt_gnd_left = TableEntry(bytearray([0x00, 0xA8]), TableEntry.uint32_size)
def pack_p60dock_cmds(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
objb = object_id.as_bytes objb = object_id.as_bytes
if op_code in P60OpCodes.STACK_3V3_ON: if op_code in P60OpCodes.STACK_3V3_ON:
q.add_log_cmd(Info.STACK_3V3_ON) q.add_log_cmd(Info.STACK_3V3_ON)

View File

@ -6,7 +6,7 @@
@date 22.11.2021 @date 22.11.2021
""" """
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
class CommandIds: class CommandIds:
@ -16,7 +16,9 @@ class CommandIds:
PRINT_PDEC_MON = bytearray([0x0, 0x0, 0x0, 0x1]) PRINT_PDEC_MON = bytearray([0x0, 0x0, 0x0, 0x1])
def pack_pdec_handler_test(object_id: bytearray, q: QueueHelper, op_code: str): def pack_pdec_handler_test(
object_id: bytearray, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Testing PDEC handler with object id: {object_id.hex()}") q.add_log_cmd(f"Testing PDEC handler with object id: {object_id.hex()}")
if op_code == "0": if op_code == "0":
q.add_log_cmd("PDEC Handler: Print CLCW") q.add_log_cmd("PDEC Handler: Print CLCW")

View File

@ -5,7 +5,7 @@
""" """
import gomspace.gomspace_common as gs import gomspace.gomspace_common as gs
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command, generate_one_hk_command,
make_sid, make_sid,
@ -55,7 +55,7 @@ class PDU1TestProcedure:
turn_channel_3_off = False turn_channel_3_off = False
def pack_pdu1_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Commanding PDU1") q.add_log_cmd("Commanding PDU1")
objb = object_id.as_bytes objb = object_id.as_bytes
if op_code == Pdu1OpCodes.TCS_BOARD_ON.value: if op_code == Pdu1OpCodes.TCS_BOARD_ON.value:

View File

@ -6,7 +6,7 @@
@author J. Meier @author J. Meier
@date 17.12.2020 @date 17.12.2020
""" """
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command, generate_one_hk_command,
generate_one_diag_command, generate_one_diag_command,
@ -65,7 +65,7 @@ class PDU2TestProcedure:
request_hk_table = False request_hk_table = False
def pack_pdu2_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_pdu2_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
q.add_log_cmd("Testing PDU2") q.add_log_cmd("Testing PDU2")
objb = object_id.as_bytes objb = object_id.as_bytes
if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value: if op_code == Pdu2OpCodes.ACS_SIDE_B_ON.value:

View File

@ -9,7 +9,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -17,7 +17,9 @@ class ActionIds:
DUMP_MRAM = 1 DUMP_MRAM = 1
def pack_ploc_memory_dumper_cmd(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ploc_memory_dumper_cmd(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}" f"Testing PLOC memory dumper with object id: {object_id.as_hex_string}"
) )

View File

@ -11,7 +11,7 @@ import enum
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from utility.input_helper import InputHelper from utility.input_helper import InputHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
@ -66,7 +66,9 @@ class PlocReplyIds(enum.IntEnum):
TM_CAM_CMD_RPT = 19 TM_CAM_CMD_RPT = 19
def pack_ploc_mpsoc_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ploc_mpsoc_commands(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}" f"Generate command for PLOC MPSoC with object id: {object_id.as_hex_string}"
) )

View File

@ -10,7 +10,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from utility.input_helper import InputHelper from utility.input_helper import InputHelper
@ -106,7 +106,9 @@ class SupvHkIds:
BOOT_STATUS_REPORT = 53 BOOT_STATUS_REPORT = 53
def pack_ploc_supv_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_ploc_supv_commands(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}")
obyt = object_id.as_bytes obyt = object_id.as_bytes
if op_code == "0": if op_code == "0":

View File

@ -7,7 +7,7 @@ from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper from tmtccmd.config import TmTcDefWrapper
from tmtccmd.config.tmtc import OpCodeEntry from tmtccmd.config.tmtc import OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
make_sid, make_sid,
generate_one_diag_command, generate_one_diag_command,
@ -146,10 +146,10 @@ def add_pl_pcdu_cmds(defs: TmTcDefWrapper):
info="Inject failure in MPA to HPA transition", info="Inject failure in MPA to HPA transition",
) )
oce.add(keys=OpCodes.INJECT_ALL_ON_FAILURE, info="Inject failure in all on mode") oce.add(keys=OpCodes.INJECT_ALL_ON_FAILURE, info="Inject failure in all on mode")
defs.add_service(CustomServiceList.PL_PCDU.value, "PL PCDU", oce) defs.add_service(CustomServiceList.PL_PCDU, "PL PCDU", oce)
def pack_pl_pcdu_commands(q: QueueHelper, op_code: str): def pack_pl_pcdu_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.SWITCH_ON: if op_code in OpCodes.SWITCH_ON:
pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Modes.ON, submode=0) pack_pl_pcdu_mode_cmd(q=q, info=Info.SWITCH_ON, mode=Modes.ON, submode=0)
if op_code in OpCodes.SWITCH_OFF: if op_code in OpCodes.SWITCH_OFF:
@ -215,7 +215,7 @@ def pack_pl_pcdu_commands(q: QueueHelper, op_code: str):
) )
def hpa_on_procedure(q: QueueHelper): def hpa_on_procedure(q: DefaultPusQueueHelper):
delay_dro_to_x8 = request_wait_time() delay_dro_to_x8 = request_wait_time()
if delay_dro_to_x8 is None: if delay_dro_to_x8 is None:
delay_dro_to_x8 = 900 delay_dro_to_x8 = 900
@ -389,7 +389,7 @@ def submode_mask_to_submode(on_tgt: NormalSubmodesMask) -> int:
) )
def pack_wait_time_cmd(q: QueueHelper, param_id: int, print_str: str): def pack_wait_time_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
wait_time = request_wait_time() wait_time = request_wait_time()
q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}") q.add_log_cmd(f"Updating {print_str} wait time to {wait_time}")
if wait_time is None: if wait_time is None:
@ -403,7 +403,7 @@ def pack_wait_time_cmd(q: QueueHelper, param_id: int, print_str: str):
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data)) q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data))
def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str): def pack_failure_injection_cmd(q: DefaultPusQueueHelper, param_id: int, print_str: str):
q.add_log_cmd(f"Inserting {print_str} error") q.add_log_cmd(f"Inserting {print_str} error")
param_data = pack_boolean_parameter_app_data( param_data = pack_boolean_parameter_app_data(
object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True object_id=PL_PCDU_ID, domain_id=0, unique_id=param_id, parameter=True
@ -411,7 +411,9 @@ def pack_failure_injection_cmd(q: QueueHelper, param_id: int, print_str: str):
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data)) q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=param_data))
def pack_pl_pcdu_mode_cmd(q: QueueHelper, info: str, mode: Modes, submode: int): def pack_pl_pcdu_mode_cmd(
q: DefaultPusQueueHelper, info: str, mode: Modes, submode: int
):
q.add_log_cmd(info) q.add_log_cmd(info)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode) mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
q.add_pus_tc( q.add_pus_tc(

View File

@ -12,7 +12,7 @@ from config.definitions import CustomServiceList
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data, Modes from pus_tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -61,7 +61,9 @@ def add_rad_sens_cmds(defs: TmTcDefWrapper):
) )
def pack_rad_sensor_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_rad_sensor_test_into(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}") q.add_log_cmd(f"Commanding Radiation sensor handler {object_id}")
if op_code in OpCodes.ON: if op_code in OpCodes.ON:
@ -87,7 +89,9 @@ def pack_rad_sensor_test_into(object_id: ObjectIdU32, q: QueueHelper, op_code: s
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command))
def rad_sensor_mode_cmd(object_id: ObjectIdU32, mode: Modes, info: str, q: QueueHelper): def rad_sensor_mode_cmd(
object_id: ObjectIdU32, mode: Modes, info: str, q: DefaultPusQueueHelper
):
q.add_log_cmd(f"Rad sensor: {info}") q.add_log_cmd(f"Rad sensor: {info}")
mode_data = pack_mode_data(object_id.as_bytes, mode, 0) mode_data = pack_mode_data(object_id.as_bytes, mode, 0)
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data)) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -7,7 +7,7 @@
import struct import struct
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import ( from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command, generate_one_hk_command,
generate_one_diag_command, generate_one_diag_command,
@ -115,7 +115,7 @@ def add_rw_cmds(defs: TmTcDefWrapper):
def pack_single_rw_test_into( def pack_single_rw_test_into(
object_id: bytes, rw_idx: int, q: QueueHelper, op_code: str object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str
): ):
if op_code in OpCodesDevs.SPEED: if op_code in OpCodesDevs.SPEED:
speed = int(input("Specify speed [0.1 RPM]: ")) speed = int(input("Specify speed [0.1 RPM]: "))
@ -154,7 +154,7 @@ def pack_single_rw_test_into(
) )
def pack_rw_ass_cmds(q: QueueHelper, object_id: bytes, op_code: str): def pack_rw_ass_cmds(q: DefaultPusQueueHelper, object_id: bytes, op_code: str):
if op_code in OpCodesAss.OFF: if op_code in OpCodesAss.OFF:
data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0) data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0)
q.add_pus_tc( q.add_pus_tc(

View File

@ -5,7 +5,7 @@ from pus_tc.devs.pdec_handler import CommandIds
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data, Subservices
import config.object_ids as oids import config.object_ids as oids
@ -55,7 +55,9 @@ def specify_rtd_cmds(defs: TmTcDefWrapper):
) )
def pack_rtd_commands(op_code: str, object_id: Optional[ObjectIdU32], q: QueueHelper): def pack_rtd_commands(
op_code: str, object_id: Optional[ObjectIdU32], q: DefaultPusQueueHelper
):
if object_id is not None and object_id not in RTD_IDS: if object_id is not None and object_id not in RTD_IDS:
print("Specified object ID not a valid RTD ID") print("Specified object ID not a valid RTD ID")
object_id = None object_id = None

View File

@ -2,7 +2,7 @@ import enum
import json import json
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.config import OpCodeEntry, TmTcDefWrapper from tmtccmd.config import OpCodeEntry, TmTcDefWrapper
from config.object_ids import SCEX_HANDLER_ID from config.object_ids import SCEX_HANDLER_ID
@ -56,13 +56,11 @@ def add_scex_cmds(defs: TmTcDefWrapper):
oce.add(keys=OpCodes.FRAM, info=Info.FRAM) oce.add(keys=OpCodes.FRAM, info=Info.FRAM)
defs.add_service( defs.add_service(
name=CustomServiceList.SCEX.value, name=CustomServiceList.SCEX.value, info="SCEX Device", op_code_entry=oce
info="SCEX Device",
op_code_entry=oce
) )
def pack_scex_cmds(q: QueueHelper, op_code: str): def pack_scex_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.PING: if op_code in OpCodes.PING:
q.add_log_cmd(Info.PING) q.add_log_cmd(Info.PING)
app_data = bytes([0]) app_data = bytes([0])
@ -70,17 +68,23 @@ def pack_scex_cmds(q: QueueHelper, op_code: str):
if op_code in OpCodes.ION_CMD: if op_code in OpCodes.ION_CMD:
q.add_log_cmd(Info.ION_CMD) q.add_log_cmd(Info.ION_CMD)
app_data = bytes([0]) app_data = bytes([0])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.ION_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.ION_CMD, app_data)
)
if op_code in OpCodes.TEMP_CMD: if op_code in OpCodes.TEMP_CMD:
q.add_log_cmd(Info.TEMP_CMD) q.add_log_cmd(Info.TEMP_CMD)
app_data = bytes([0]) app_data = bytes([0])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.TEMP_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.TEMP_CMD, app_data)
)
if op_code in OpCodes.EXP_STATUS_CMD: if op_code in OpCodes.EXP_STATUS_CMD:
q.add_log_cmd(Info.EXP_STATUS_CMD) q.add_log_cmd(Info.EXP_STATUS_CMD)
app_data = bytes([0]) app_data = bytes([0])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.EXP_STATUS_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.EXP_STATUS_CMD, app_data)
)
# one cell # one cell
if op_code in OpCodes.ONE_CELLS_CMD: if op_code in OpCodes.ONE_CELLS_CMD:
@ -126,7 +130,9 @@ def pack_scex_cmds(q: QueueHelper, op_code: str):
app_data.append(dac_weight2[cn]) app_data.append(dac_weight2[cn])
app_data.append(dac_weight3[cn]) app_data.append(dac_weight3[cn])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.ONE_CELLS_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.ONE_CELLS_CMD, app_data)
)
if op_code in OpCodes.ALL_CELLS_CMD: if op_code in OpCodes.ALL_CELLS_CMD:
q.add_log_cmd(Info.ALL_CELLS_CMD) q.add_log_cmd(Info.ALL_CELLS_CMD)
@ -156,7 +162,9 @@ def pack_scex_cmds(q: QueueHelper, op_code: str):
app_data.append(dac_weight2[cn]) app_data.append(dac_weight2[cn])
app_data.append(dac_weight3[cn]) app_data.append(dac_weight3[cn])
q.add_pus_tc(generate_action_command(SCEX_HANDLER_ID, ActionIds.ALL_CELLS_CMD, app_data)) q.add_pus_tc(
generate_action_command(SCEX_HANDLER_ID, ActionIds.ALL_CELLS_CMD, app_data)
)
if op_code in OpCodes.FRAM: if op_code in OpCodes.FRAM:
q.add_log_cmd(Info.FRAM) q.add_log_cmd(Info.FRAM)

View File

@ -6,14 +6,16 @@
@date 15.02.2021 @date 15.02.2021
""" """
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
class ActionIds: class ActionIds:
DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5]) DEPLOY_SOLAR_ARRAYS = bytearray([0x0, 0x0, 0x0, 0x5])
def pack_solar_array_deployment_test_into(object_id: bytearray, q: QueueHelper): def pack_solar_array_deployment_test_into(
object_id: bytearray, q: DefaultPusQueueHelper
):
q.add_log_cmd("Testing S/A Deployment") q.add_log_cmd("Testing S/A Deployment")
command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS command = object_id + ActionIds.DEPLOY_SOLAR_ARRAYS

View File

@ -11,7 +11,7 @@ from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from utility.input_helper import InputHelper from utility.input_helper import InputHelper
@ -150,7 +150,9 @@ class Submode:
FIRMWARE = 2 FIRMWARE = 2
def pack_star_tracker_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_star_tracker_commands(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Generate command for star tracker with object id: {object_id.as_hex_string}" f"Generate command for star tracker with object id: {object_id.as_hex_string}"
) )

View File

@ -11,7 +11,7 @@
import struct import struct
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -24,7 +24,9 @@ class ImagePathDefs:
uploadFile = "/mnt/sd0/startracker/gemma.bin" uploadFile = "/mnt/sd0/startracker/gemma.bin"
def pack_str_img_helper_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_str_img_helper_command(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
q.add_log_cmd( q.add_log_cmd(
f"Testing star tracker image helper object id: {object_id.as_hex_string}" f"Testing star tracker image helper object id: {object_id.as_hex_string}"
) )

View File

@ -5,7 +5,7 @@
@author J. Meier @author J. Meier
@date 13.12.2020 @date 13.12.2020
""" """
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
@ -37,7 +37,9 @@ class CommandIds:
DISABLE_DEBUG = 21 DISABLE_DEBUG = 21
def pack_syrlinks_command(object_id: ObjectIdU32, q: QueueHelper, op_code: str): def pack_syrlinks_command(
object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str
):
obyt = object_id.as_bytes obyt = object_id.as_bytes
q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}") q.add_log_cmd(f"Testing Syrlinks with object id: {object_id.as_hex_string}")
if op_code == "0": if op_code == "0":

View File

@ -7,7 +7,7 @@
""" """
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data from pus_tc.service_200_mode import pack_mode_data
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -33,7 +33,9 @@ class Tmp1075ActionIds:
start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02]) start_adc_conversion = bytearray([0x0, 0x0, 0x0, 0x02])
def pack_tmp1075_test_into(object_id: ObjectIdU32, op_code: str, q: QueueHelper): def pack_tmp1075_test_into(
object_id: ObjectIdU32, op_code: str, q: DefaultPusQueueHelper
):
q.add_log_cmd( q.add_log_cmd(
f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}" f"Testing Tmp1075 Temperature Sensor Handler with object id: {object_id.as_hex_string}"
) )

View File

@ -4,11 +4,14 @@ from typing import cast
from pus_tc.devs.rtd import pack_rtd_commands from pus_tc.devs.rtd import pack_rtd_commands
from pus_tc.devs.scex import pack_scex_cmds from pus_tc.devs.scex import pack_scex_cmds
from pus_tc.system.controllers import pack_cmd_ctrl_to_prompted_mode, get_object_from_op_code from pus_tc.system.controllers import (
pack_cmd_ctrl_to_prompted_mode,
get_object_from_op_code,
)
from tmtccmd import DefaultProcedureInfo from tmtccmd import DefaultProcedureInfo
from tmtccmd.config import CoreServiceList from tmtccmd.config import CoreServiceList
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import FeedWrapper from tmtccmd.tc import FeedWrapper, DefaultPusQueueHelper
from tmtccmd.tc.pus_5_event import ( from tmtccmd.tc.pus_5_event import (
pack_generic_service_5_test_into, pack_generic_service_5_test_into,
) )
@ -73,8 +76,9 @@ from tmtccmd.util import ObjectIdU32
LOGGER = get_console_logger() LOGGER = get_console_logger()
def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper, gui: bool): def handle_default_procedure(
queue_helper = wrapper.queue_helper info: DefaultProcedureInfo, queue_helper: DefaultPusQueueHelper, gui: bool
):
service = info.service service = info.service
op_code = info.op_code op_code = info.op_code
obj_id_man = get_object_ids() obj_id_man = get_object_ids()
@ -199,9 +203,7 @@ def handle_default_procedure(info: DefaultProcedureInfo, wrapper: FeedWrapper, g
if service == CustomServiceList.TIME.value: if service == CustomServiceList.TIME.value:
return pack_set_current_time_ascii_command(q=queue_helper) return pack_set_current_time_ascii_command(q=queue_helper)
if service == CustomServiceList.RW_ASSEMBLY.value: if service == CustomServiceList.RW_ASSEMBLY.value:
return pack_rw_ass_cmds( return pack_rw_ass_cmds(q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code)
q=queue_helper, object_id=RW_ASSEMBLY, op_code=op_code
)
if service == CustomServiceList.CONTROLLERS.value: if service == CustomServiceList.CONTROLLERS.value:
return pack_cmd_ctrl_to_prompted_mode( return pack_cmd_ctrl_to_prompted_mode(
q=queue_helper, object_id=get_object_from_op_code(op_code), gui=gui q=queue_helper, object_id=get_object_from_op_code(op_code), gui=gui

View File

@ -6,14 +6,14 @@
@date 02.05.2020 @date 02.05.2020
""" """
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from config.object_ids import TEST_DEVICE_ID from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
def pack_service200_test_into(q: QueueHelper): def pack_service200_test_into(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Service 200") q.add_log_cmd("Testing Service 200")
# Object ID: Dummy Device # Object ID: Dummy Device
obj_id = TEST_DEVICE_OBJ_ID obj_id = TEST_DEVICE_OBJ_ID

View File

@ -1,6 +1,6 @@
import enum import enum
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
@ -30,7 +30,7 @@ class DualSideSubmodes(enum.IntEnum):
DUAL_SIDE = 2 DUAL_SIDE = 2
def pack_acs_command(q: QueueHelper, op_code: str): def pack_acs_command(q: DefaultPusQueueHelper, op_code: str):
if op_code in AcsOpCodes.ACS_ASS_A_SIDE: if op_code in AcsOpCodes.ACS_ASS_A_SIDE:
command_mode( command_mode(
object_id=ACS_BOARD_ASS_ID, object_id=ACS_BOARD_ASS_ID,
@ -89,7 +89,7 @@ def pack_acs_command(q: QueueHelper, op_code: str):
) )
def pack_sus_cmds(q: QueueHelper, op_code: str): def pack_sus_cmds(q: DefaultPusQueueHelper, op_code: str):
if op_code in SusOpCodes.SUS_ASS_NOM_SIDE: if op_code in SusOpCodes.SUS_ASS_NOM_SIDE:
command_mode( command_mode(
object_id=SUS_BOARD_ASS_ID, object_id=SUS_BOARD_ASS_ID,

View File

@ -1,7 +1,7 @@
from typing import Union from typing import Union
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
@ -9,7 +9,7 @@ def command_mode(
object_id: bytes, object_id: bytes,
mode: Union[int, Modes], mode: Union[int, Modes],
submode: int, submode: int,
q: QueueHelper, q: DefaultPusQueueHelper,
info: str, info: str,
): ):
q.add_log_cmd(info) q.add_log_cmd(info)

View File

@ -1,4 +1,4 @@
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -18,7 +18,9 @@ class Info:
CORE_CONTROLLER = "ACS controller" CORE_CONTROLLER = "ACS controller"
def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectIdU32, gui: bool): def pack_cmd_ctrl_to_prompted_mode(
q: DefaultPusQueueHelper, object_id: ObjectIdU32, gui: bool
):
param_list = [ param_list = [
{"name": "Mode", "defaultValue": "2"}, {"name": "Mode", "defaultValue": "2"},
{"name": "Submode", "defaultValue": "0"}, {"name": "Submode", "defaultValue": "0"},
@ -41,7 +43,7 @@ def pack_cmd_ctrl_to_prompted_mode(q: QueueHelper, object_id: ObjectIdU32, gui:
) )
def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_off(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
command_mode( command_mode(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
mode=Modes.OFF, mode=Modes.OFF,
@ -51,7 +53,7 @@ def pack_cmd_ctrl_to_off(q: QueueHelper, object_id: ObjectIdU32):
) )
def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_on(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
command_mode( command_mode(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
mode=Modes.ON, mode=Modes.ON,
@ -61,7 +63,7 @@ def pack_cmd_ctrl_to_on(q: QueueHelper, object_id: ObjectIdU32):
) )
def pack_cmd_ctrl_to_nml(q: QueueHelper, object_id: ObjectIdU32): def pack_cmd_ctrl_to_nml(q: DefaultPusQueueHelper, object_id: ObjectIdU32):
command_mode( command_mode(
object_id=object_id.as_bytes, object_id=object_id.as_bytes,
mode=Modes.NORMAL, mode=Modes.NORMAL,

View File

@ -3,7 +3,7 @@ import enum
from config.definitions import CustomServiceList from config.definitions import CustomServiceList
from tmtccmd.config import TmTcDefWrapper from tmtccmd.config import TmTcDefWrapper
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_8_funccmd import generate_action_command from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
@ -109,10 +109,10 @@ def add_core_controller_definitions(defs: TmTcDefWrapper):
keys=OpCodes.RESET_REBOOT_COUNTER_11, keys=OpCodes.RESET_REBOOT_COUNTER_11,
info="Reset reboot counter 1 1", info="Reset reboot counter 1 1",
) )
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) defs.add_service(CustomServiceList.CORE, "Core Controller", oce)
def pack_core_commands(q: QueueHelper, op_code: str): def pack_core_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.REBOOT_XSC: if op_code in OpCodes.REBOOT_XSC:
reboot_self, chip_select, copy_select = determine_reboot_params() reboot_self, chip_select, copy_select = determine_reboot_params()
perform_reboot_cmd( perform_reboot_cmd(
@ -231,7 +231,7 @@ def determine_reboot_params() -> (bool, Chip, Copy):
def perform_reboot_cmd( def perform_reboot_cmd(
q: QueueHelper, q: DefaultPusQueueHelper,
reboot_self: bool, reboot_self: bool,
chip: Chip = Chip.NONE, chip: Chip = Chip.NONE,
copy: Copy = Copy.NONE, copy: Copy = Copy.NONE,

View File

@ -9,7 +9,7 @@ from config.object_ids import get_object_ids
from pus_tc.system.tcs import pack_tcs_sys_commands from pus_tc.system.tcs import pack_tcs_sys_commands
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_11_tc_sched import ( from tmtccmd.tc.pus_11_tc_sched import (
generate_time_tagged_cmd, generate_time_tagged_cmd,
generate_enable_tc_sched_cmd, generate_enable_tc_sched_cmd,
@ -122,7 +122,7 @@ class GenericHkListeningCfg:
return GenericHkListeningCfg(False, False, False) return GenericHkListeningCfg(False, False, False)
def generic_print(q: QueueHelper, info: dict): def generic_print(q: DefaultPusQueueHelper, info: dict):
q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})") q.add_log_cmd(f"Executing {info[1]} Procedure (OpCodes: {info[0]})")
@ -138,7 +138,7 @@ def add_proc_cmds(defs: TmTcDefWrapper):
def pack_generic_hk_listening_cmds( def pack_generic_hk_listening_cmds(
q: QueueHelper, q: DefaultPusQueueHelper,
proc_key: str, proc_key: str,
sid_list: list[bytearray], sid_list: list[bytearray],
diag_list: list[bool], diag_list: list[bool],
@ -205,7 +205,7 @@ def pack_generic_hk_listening_cmds(
diag_list.clear() diag_list.clear()
def pack_proc_commands(q: QueueHelper, op_code: str): def pack_proc_commands(q: DefaultPusQueueHelper, op_code: str):
sid_list = [] sid_list = []
obj_id_dict = get_object_ids() obj_id_dict = get_object_ids()
if op_code in OpCodes.RESET_SCHED: if op_code in OpCodes.RESET_SCHED:
@ -695,7 +695,7 @@ def pack_proc_commands(q: QueueHelper, op_code: str):
def enable_listen_to_hk_for_x_seconds( def enable_listen_to_hk_for_x_seconds(
q: QueueHelper, q: DefaultPusQueueHelper,
diag: bool, diag: bool,
device: str, device: str,
sid: bytes, sid: bytes,
@ -710,7 +710,7 @@ def enable_listen_to_hk_for_x_seconds(
def gen_disable_listen_to_hk_for_x_seconds( def gen_disable_listen_to_hk_for_x_seconds(
q: QueueHelper, q: DefaultPusQueueHelper,
diag: bool, diag: bool,
device: str, device: str,
sid: bytes, sid: bytes,
@ -720,7 +720,7 @@ def gen_disable_listen_to_hk_for_x_seconds(
def activate_mgts_alternately( def activate_mgts_alternately(
q: QueueHelper, q: DefaultPusQueueHelper,
): ):
q.add_pus_tc( q.add_pus_tc(
@ -790,7 +790,9 @@ def activate_mgts_alternately(
q.add_wait_seconds(40.0) q.add_wait_seconds(40.0)
def rw_speed_cmd_single(q: QueueHelper, oid: bytes, speed: int, ramp_time: int): def rw_speed_cmd_single(
q: DefaultPusQueueHelper, oid: bytes, speed: int, ramp_time: int
):
q.add_pus_tc( q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time) pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
) )
@ -799,7 +801,7 @@ def rw_speed_cmd_single(q: QueueHelper, oid: bytes, speed: int, ramp_time: int):
def rw_speed_up_cmd_consec( def rw_speed_up_cmd_consec(
q: QueueHelper, obids: List[bytes], speed: int, ramp_time: int q: DefaultPusQueueHelper, obids: List[bytes], speed: int, ramp_time: int
): ):
for oid in obids: for oid in obids:
q.add_pus_tc( q.add_pus_tc(
@ -807,7 +809,9 @@ def rw_speed_up_cmd_consec(
) )
def rw_speed_down_cmd_consec(q: QueueHelper, obids: List[bytes], ramp_time: int): def rw_speed_down_cmd_consec(
q: DefaultPusQueueHelper, obids: List[bytes], ramp_time: int
):
for oid in obids: for oid in obids:
q.add_pus_tc( q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time) pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)
@ -815,7 +819,7 @@ def rw_speed_down_cmd_consec(q: QueueHelper, obids: List[bytes], ramp_time: int)
def activate_all_rws_in_sequence( def activate_all_rws_in_sequence(
q: QueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int q: DefaultPusQueueHelper, init_ssc: int, test_speed: int, test_ramp_time: int
): ):
new_ssc = init_ssc new_ssc = init_ssc
# RW1 speed cmd # RW1 speed cmd
@ -831,7 +835,7 @@ def activate_all_rws_in_sequence(
return new_ssc return new_ssc
def activate_all_rws_two_consecutively(q: QueueHelper): def activate_all_rws_two_consecutively(q: DefaultPusQueueHelper):
# RW1+3 speed cmd # RW1+3 speed cmd
q.add_wait_seconds(5.0) q.add_wait_seconds(5.0)
rw_speed_up_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], -20000, 10000) rw_speed_up_cmd_consec(q, [oids.RW1_ID, oids.RW3_ID], -20000, 10000)

View File

@ -1,4 +1,4 @@
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes from tmtccmd.tc.pus_200_fsfw_modes import Modes
from .common import command_mode from .common import command_mode
@ -15,7 +15,7 @@ class Info:
TCS_BOARD_ASS_OFF = "Switching TCS board assembly off" TCS_BOARD_ASS_OFF = "Switching TCS board assembly off"
def pack_tcs_sys_commands(q: QueueHelper, op_code: str): def pack_tcs_sys_commands(q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.TCS_BOARD_ASS_NORMAL: if op_code in OpCodes.TCS_BOARD_ASS_NORMAL:
command_mode( command_mode(
object_id=TCS_BOARD_ASS_ID, object_id=TCS_BOARD_ASS_ID,

View File

@ -3,7 +3,7 @@ from datetime import datetime
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.logging import get_console_logger from tmtccmd.logging import get_console_logger
from tmtccmd.tc import QueueHelper from tmtccmd.tc import DefaultPusQueueHelper
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -16,7 +16,7 @@ class Info:
SET_CURRENT_TIME = "Setting current time in ASCII format" SET_CURRENT_TIME = "Setting current time in ASCII format"
def pack_set_current_time_ascii_command(q: QueueHelper): def pack_set_current_time_ascii_command(q: DefaultPusQueueHelper):
time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0" time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0"
current_time_ascii = time_test_current_time.encode("ascii") current_time_ascii = time_test_current_time.encode("ascii")
LOGGER.info(f"Current time in ASCII format: {current_time_ascii}") LOGGER.info(f"Current time in ASCII format: {current_time_ascii}")

View File

@ -45,6 +45,7 @@ from tmtccmd.tc import (
FeedWrapper, FeedWrapper,
TcProcedureType, TcProcedureType,
TcQueueEntryType, TcQueueEntryType,
DefaultPusQueueHelper,
) )
from tmtccmd.config import default_json_path, SetupWrapper from tmtccmd.config import default_json_path, SetupWrapper
from tmtccmd.config.args import ( from tmtccmd.config.args import (
@ -88,7 +89,7 @@ class TcHandler(TcHandlerBase):
pus_verificator: PusVerificator, pus_verificator: PusVerificator,
file_logger: logging.Logger, file_logger: logging.Logger,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
gui: bool gui: bool,
): ):
super().__init__() super().__init__()
self.seq_count_provider = seq_count_provider self.seq_count_provider = seq_count_provider
@ -96,10 +97,19 @@ class TcHandler(TcHandlerBase):
self.file_logger = file_logger self.file_logger = file_logger
self.raw_logger = raw_logger self.raw_logger = raw_logger
self.gui = gui self.gui = gui
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None,
pus_apid=PUS_APID,
seq_cnt_provider=seq_count_provider,
pus_verificator=pus_verificator,
)
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper): def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT: if info.proc_type == TcProcedureType.DEFAULT:
handle_default_procedure(info.to_def_procedure(), wrapper, self.gui) handle_default_procedure(
info.to_def_procedure(), self.queue_helper, self.gui
)
def send_cb(self, send_params: SendCbParams): def send_cb(self, send_params: SendCbParams):
entry_helper = send_params.entry entry_helper = send_params.entry
@ -151,7 +161,7 @@ def setup_tmtc(
verificator: PusVerificator, verificator: PusVerificator,
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper, raw_logger: RawTmtcTimedLogWrapper,
gui: bool gui: bool,
) -> (CcsdsTmHandler, TcHandler): ) -> (CcsdsTmHandler, TcHandler):
verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger) verification_wrapper = VerificationWrapper(verificator, LOGGER, printer.file_logger)
pus_handler = PusHandler(verification_wrapper, printer, raw_logger) pus_handler = PusHandler(verification_wrapper, printer, raw_logger)
@ -163,7 +173,7 @@ def setup_tmtc(
pus_verificator=verificator, pus_verificator=verificator,
file_logger=printer.file_logger, file_logger=printer.file_logger,
raw_logger=raw_logger, raw_logger=raw_logger,
gui=gui gui=gui,
) )
return ccsds_handler, tc_handler return ccsds_handler, tc_handler