update for new default pus queue helper
This commit is contained in:
parent
57aeba3de0
commit
6de31ccd3f
59
common.py
59
common.py
@ -4,6 +4,7 @@ import sys
|
|||||||
from tmtccmd.com_if import ComInterface
|
from tmtccmd.com_if import ComInterface
|
||||||
from tmtccmd.logging import get_current_time_string
|
from tmtccmd.logging import get_current_time_string
|
||||||
from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices
|
from tmtccmd.pus.pus_11_tc_sched import Subservices as Pus11Subservices
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
|
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
|
||||||
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
|
||||||
|
|
||||||
@ -21,7 +22,7 @@ except ImportError as error:
|
|||||||
print("Python tmtccmd module could not be imported. Make sure it is installed")
|
print("Python tmtccmd module could not be imported. Make sure it is installed")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
from spacepackets.ecss import PusVerificator, PusTelecommand
|
from spacepackets.ecss import PusVerificator, PusTelecommand, PusServices
|
||||||
|
|
||||||
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
|
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
|
||||||
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
|
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
|
||||||
@ -76,6 +77,7 @@ class PusHandler(SpecificApidHandlerBase):
|
|||||||
class TcHandler(TcHandlerBase):
|
class TcHandler(TcHandlerBase):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
apid: int,
|
||||||
seq_count_provider: FileSeqCountProvider,
|
seq_count_provider: FileSeqCountProvider,
|
||||||
pus_verificator: PusVerificator,
|
pus_verificator: PusVerificator,
|
||||||
file_logger: logging.Logger,
|
file_logger: logging.Logger,
|
||||||
@ -86,37 +88,44 @@ class TcHandler(TcHandlerBase):
|
|||||||
self.pus_verificator = pus_verificator
|
self.pus_verificator = pus_verificator
|
||||||
self.file_logger = file_logger
|
self.file_logger = file_logger
|
||||||
self.raw_logger = raw_logger
|
self.raw_logger = raw_logger
|
||||||
|
self.queue_helper = DefaultPusQueueHelper(
|
||||||
|
queue_wrapper=None,
|
||||||
|
apid=apid,
|
||||||
|
seq_cnt_provider=seq_count_provider,
|
||||||
|
pus_verificator=pus_verificator,
|
||||||
|
)
|
||||||
|
|
||||||
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
|
def feed_cb(self, info: ProcedureHelper, wrapper: FeedWrapper):
|
||||||
|
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
|
||||||
if info.proc_type == TcProcedureType.DEFAULT:
|
if info.proc_type == TcProcedureType.DEFAULT:
|
||||||
def_proc = info.to_def_procedure()
|
def_proc = info.to_def_procedure()
|
||||||
service = def_proc.service
|
service = def_proc.service
|
||||||
op_code = def_proc.op_code
|
op_code = def_proc.op_code
|
||||||
if service == CoreServiceList.SERVICE_2.value:
|
if service == CoreServiceList.SERVICE_2.value:
|
||||||
return pack_service_2_commands_into(
|
return pack_service_2_commands_into(
|
||||||
op_code=op_code, q=wrapper.queue_helper
|
op_code=op_code, q=self.queue_helper
|
||||||
)
|
)
|
||||||
if service == CoreServiceList.SERVICE_3.value:
|
if service == CoreServiceList.SERVICE_3.value:
|
||||||
return pack_service_3_commands_into(
|
return pack_service_3_commands_into(
|
||||||
op_code=op_code, q=wrapper.queue_helper
|
op_code=op_code, q=self.queue_helper
|
||||||
)
|
)
|
||||||
if service == CoreServiceList.SERVICE_5.value:
|
if service == CoreServiceList.SERVICE_5.value:
|
||||||
return pack_generic_service_5_test_into(q=wrapper.queue_helper)
|
return pack_generic_service_5_test_into(q=self.queue_helper)
|
||||||
if service == CoreServiceList.SERVICE_8.value:
|
if service == CoreServiceList.SERVICE_8.value:
|
||||||
return pack_service_8_commands_into(
|
return pack_service_8_commands_into(
|
||||||
op_code=op_code, q=wrapper.queue_helper
|
op_code=op_code, q=self.queue_helper
|
||||||
)
|
)
|
||||||
if service == CoreServiceList.SERVICE_11.value:
|
if service == CoreServiceList.SERVICE_11.value:
|
||||||
return pack_service_11_commands(op_code=op_code, q=wrapper.queue_helper)
|
return pack_service_11_commands(op_code=op_code, q=self.queue_helper)
|
||||||
if service == CoreServiceList.SERVICE_17.value:
|
if service == CoreServiceList.SERVICE_17.value:
|
||||||
return pack_service_17_commands(op_code=op_code, q=wrapper.queue_helper)
|
return pack_service_17_commands(op_code=op_code, q=self.queue_helper)
|
||||||
if service == CoreServiceList.SERVICE_20.value:
|
if service == CoreServiceList.SERVICE_20.value:
|
||||||
return pack_service20_commands_into(
|
return pack_service20_commands_into(
|
||||||
q=wrapper.queue_helper, op_code=op_code
|
q=self.queue_helper, op_code=op_code
|
||||||
)
|
)
|
||||||
if service == CoreServiceList.SERVICE_200.value:
|
if service == CoreServiceList.SERVICE_200.value:
|
||||||
return pack_service_200_commands_into(
|
return pack_service_200_commands_into(
|
||||||
q=wrapper.queue_helper, op_code=op_code
|
q=self.queue_helper, op_code=op_code
|
||||||
)
|
)
|
||||||
LOGGER.warning("Invalid Service !")
|
LOGGER.warning("Invalid Service !")
|
||||||
|
|
||||||
@ -131,36 +140,19 @@ class TcHandler(TcHandlerBase):
|
|||||||
|
|
||||||
def handle_tc_send_cb(self, params: SendCbParams):
|
def handle_tc_send_cb(self, params: SendCbParams):
|
||||||
pus_tc_wrapper = params.entry.to_pus_tc_entry()
|
pus_tc_wrapper = params.entry.to_pus_tc_entry()
|
||||||
# TODO: All of this stuff should be done during queue insertion time
|
|
||||||
# This requires the queue helper to be optionally able to perform TC
|
|
||||||
# post-processing via a callback or something similar. Then an API can be
|
|
||||||
# added which is also able to pack time tagged TCs and stamping both TCs
|
|
||||||
pus_tc_wrapper.pus_tc.seq_count = (
|
|
||||||
self.seq_count_provider.get_and_increment()
|
|
||||||
)
|
|
||||||
pus_tc_wrapper.pus_tc.apid = EXAMPLE_APID
|
|
||||||
if (
|
if (
|
||||||
pus_tc_wrapper.pus_tc.service == 11
|
pus_tc_wrapper.pus_tc.service == PusServices.S11_TC_SCHED
|
||||||
and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT
|
and pus_tc_wrapper.pus_tc.subservice == Pus11Subservices.TC_INSERT
|
||||||
):
|
):
|
||||||
try:
|
wrapped_tc = PusTelecommand.unpack(pus_tc_wrapper.pus_tc.app_data[4:])
|
||||||
pus_tc = PusTelecommand.unpack(
|
tc_info_string = f"Sending time-tagged command {wrapped_tc}"
|
||||||
pus_tc_wrapper.pus_tc.app_data[4:]
|
LOGGER.info(tc_info_string)
|
||||||
)
|
self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
|
||||||
self.pus_verificator.add_tc(pus_tc)
|
|
||||||
except ValueError as e:
|
|
||||||
LOGGER.warning(
|
|
||||||
f"Attempt of unpacking time tagged TC failed with exception {e}"
|
|
||||||
)
|
|
||||||
# Add TC after Sequence Count stamping
|
|
||||||
self.pus_verificator.add_tc(pus_tc_wrapper.pus_tc)
|
|
||||||
raw_tc = pus_tc_wrapper.pus_tc.pack()
|
raw_tc = pus_tc_wrapper.pus_tc.pack()
|
||||||
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
|
self.raw_logger.log_tc(pus_tc_wrapper.pus_tc)
|
||||||
tc_info_string = f"Sent {pus_tc_wrapper.pus_tc}"
|
tc_info_string = f"Sending {pus_tc_wrapper.pus_tc}"
|
||||||
LOGGER.info(tc_info_string)
|
LOGGER.info(tc_info_string)
|
||||||
self.file_logger.info(
|
self.file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
|
||||||
f"{get_current_time_string(True)}: {tc_info_string}"
|
|
||||||
)
|
|
||||||
params.com_if.send(raw_tc)
|
params.com_if.send(raw_tc)
|
||||||
|
|
||||||
def queue_finished_cb(self, info: ProcedureHelper):
|
def queue_finished_cb(self, info: ProcedureHelper):
|
||||||
@ -196,6 +188,7 @@ def setup_tmtc_handlers(
|
|||||||
ccsds_handler = CcsdsTmHandler(None)
|
ccsds_handler = CcsdsTmHandler(None)
|
||||||
ccsds_handler.add_apid_handler(pus_handler)
|
ccsds_handler.add_apid_handler(pus_handler)
|
||||||
tc_handler = TcHandler(
|
tc_handler = TcHandler(
|
||||||
|
apid=EXAMPLE_APID,
|
||||||
file_logger=printer.file_logger,
|
file_logger=printer.file_logger,
|
||||||
raw_logger=raw_logger,
|
raw_logger=raw_logger,
|
||||||
pus_verificator=verif_wrapper.pus_verificator,
|
pus_verificator=verif_wrapper.pus_verificator,
|
||||||
|
@ -8,13 +8,13 @@ from tmtccmd.config import TmTcDefWrapper
|
|||||||
from tmtccmd.config.tmtc import OpCodeEntry
|
from tmtccmd.config.tmtc import OpCodeEntry
|
||||||
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
|
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
|
||||||
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
|
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
|
||||||
from tmtccmd.tc import QueueHelper
|
|
||||||
from tmtccmd.tc.pus_11_tc_sched import (
|
from tmtccmd.tc.pus_11_tc_sched import (
|
||||||
generate_enable_tc_sched_cmd,
|
generate_enable_tc_sched_cmd,
|
||||||
generate_disable_tc_sched_cmd,
|
generate_disable_tc_sched_cmd,
|
||||||
generate_reset_tc_sched_cmd,
|
generate_reset_tc_sched_cmd,
|
||||||
pack_time_tagged_tc_app_data,
|
pack_time_tagged_tc_app_data,
|
||||||
)
|
)
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
|
||||||
class OpCodes:
|
class OpCodes:
|
||||||
@ -49,7 +49,7 @@ def add_tc_sched_cmds(cmd_dict: TmTcDefWrapper):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def __generic_pack_three_time_tagged_cmds(q: QueueHelper):
|
def __generic_pack_three_time_tagged_cmds(q: DefaultPusQueueHelper):
|
||||||
q.add_log_cmd("Testing Time-Tagged Command insertion")
|
q.add_log_cmd("Testing Time-Tagged Command insertion")
|
||||||
q.add_pus_tc(generate_enable_tc_sched_cmd())
|
q.add_pus_tc(generate_enable_tc_sched_cmd())
|
||||||
current_time = int(round(time.time()))
|
current_time = int(round(time.time()))
|
||||||
@ -74,7 +74,7 @@ def __generic_pack_three_time_tagged_cmds(q: QueueHelper):
|
|||||||
q.add_wait(timedelta(seconds=25.0))
|
q.add_wait(timedelta(seconds=25.0))
|
||||||
|
|
||||||
|
|
||||||
def pack_service_11_commands(op_code: str, q: QueueHelper):
|
def pack_service_11_commands(op_code: str, q: DefaultPusQueueHelper):
|
||||||
if op_code in OpCodes.TEST_INSERT:
|
if op_code in OpCodes.TEST_INSERT:
|
||||||
q.add_log_cmd("Testing Time-Tagged Command deletion")
|
q.add_log_cmd("Testing Time-Tagged Command deletion")
|
||||||
__generic_pack_three_time_tagged_cmds(q=q)
|
__generic_pack_three_time_tagged_cmds(q=q)
|
||||||
|
@ -2,10 +2,10 @@ from tmtccmd.pus.pus_17_test import (
|
|||||||
pack_service_17_ping_command,
|
pack_service_17_ping_command,
|
||||||
pack_generic_service17_test,
|
pack_generic_service17_test,
|
||||||
)
|
)
|
||||||
from tmtccmd.tc import QueueHelper
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
|
||||||
def pack_service_17_commands(op_code: str, q: QueueHelper):
|
def pack_service_17_commands(op_code: str, q: DefaultPusQueueHelper):
|
||||||
if op_code == "0":
|
if op_code == "0":
|
||||||
q.add_pus_tc(pack_service_17_ping_command())
|
q.add_pus_tc(pack_service_17_ping_command())
|
||||||
else:
|
else:
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
from tmtccmd.tc import QueueHelper
|
from tmtccmd.tc import QueueHelperBase
|
||||||
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
||||||
|
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
|
||||||
def pack_service_200_commands_into(q: QueueHelper, op_code: str):
|
def pack_service_200_commands_into(q: DefaultPusQueueHelper, op_code: str):
|
||||||
if op_code == "test":
|
if op_code == "test":
|
||||||
pack_service_200_test_into(q)
|
pack_service_200_test_into(q)
|
||||||
elif op_code == "asm_to_normal" or op_code == "0":
|
elif op_code == "asm_to_normal" or op_code == "0":
|
||||||
@ -18,7 +19,7 @@ def pack_service_200_commands_into(q: QueueHelper, op_code: str):
|
|||||||
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
|
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
|
||||||
|
|
||||||
|
|
||||||
def pack_service_200_test_into(q: QueueHelper):
|
def pack_service_200_test_into(q: DefaultPusQueueHelper):
|
||||||
q.add_log_cmd("Testing Service 200")
|
q.add_log_cmd("Testing Service 200")
|
||||||
# Object ID: DUMMY Device
|
# Object ID: DUMMY Device
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
|
@ -4,12 +4,13 @@ from spacepackets.ecss.tc import PusTelecommand
|
|||||||
from spacepackets.ecss import PusServices
|
from spacepackets.ecss import PusServices
|
||||||
|
|
||||||
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
|
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
|
||||||
from tmtccmd.tc import QueueHelper
|
from tmtccmd.tc import QueueHelperBase
|
||||||
from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id
|
from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id
|
||||||
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
||||||
from tmtccmd.logging import get_console_logger
|
from tmtccmd.logging import get_console_logger
|
||||||
|
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
LOGGER = get_console_logger()
|
LOGGER = get_console_logger()
|
||||||
|
|
||||||
@ -20,16 +21,16 @@ def add_param_cmds(defs: TmTcDefWrapper):
|
|||||||
defs.add_service(
|
defs.add_service(
|
||||||
name=str(PusServices.S20_PARAMETER.value),
|
name=str(PusServices.S20_PARAMETER.value),
|
||||||
info="PUS Service 20 Parameters",
|
info="PUS Service 20 Parameters",
|
||||||
op_code_entry=op_code_entry
|
op_code_entry=op_code_entry,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def pack_service20_commands_into(q: QueueHelper, op_code: str):
|
def pack_service20_commands_into(q: DefaultPusQueueHelper, op_code: str):
|
||||||
if op_code == "0":
|
if op_code == "0":
|
||||||
pack_service20_test_into(q=q)
|
pack_service20_test_into(q=q)
|
||||||
|
|
||||||
|
|
||||||
def pack_service20_test_into(q: QueueHelper, called_externally: bool = False):
|
def pack_service20_test_into(q: DefaultPusQueueHelper, called_externally: bool = False):
|
||||||
if called_externally is False:
|
if called_externally is False:
|
||||||
q.add_log_cmd("Testing Service 20")
|
q.add_log_cmd("Testing Service 20")
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
@ -48,7 +49,7 @@ def pack_service20_test_into(q: QueueHelper, called_externally: bool = False):
|
|||||||
load_param_2_simple_test_commands(q)
|
load_param_2_simple_test_commands(q)
|
||||||
|
|
||||||
|
|
||||||
def load_param_0_simple_test_commands(q: QueueHelper):
|
def load_param_0_simple_test_commands(q: DefaultPusQueueHelper):
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
|
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
|
||||||
# test checking Load for uint32_t
|
# test checking Load for uint32_t
|
||||||
@ -64,7 +65,7 @@ def load_param_0_simple_test_commands(q: QueueHelper):
|
|||||||
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
|
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
|
||||||
|
|
||||||
|
|
||||||
def load_param_1_simple_test_commands(q: QueueHelper):
|
def load_param_1_simple_test_commands(q: DefaultPusQueueHelper):
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0)
|
parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0)
|
||||||
# test checking Load for int32_t
|
# test checking Load for int32_t
|
||||||
@ -80,7 +81,7 @@ def load_param_1_simple_test_commands(q: QueueHelper):
|
|||||||
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
|
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
|
||||||
|
|
||||||
|
|
||||||
def load_param_2_simple_test_commands(q: QueueHelper):
|
def load_param_2_simple_test_commands(q: DefaultPusQueueHelper):
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0)
|
parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0)
|
||||||
# test checking Load for float
|
# test checking Load for float
|
||||||
|
@ -9,21 +9,21 @@ import struct
|
|||||||
|
|
||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
|
|
||||||
from tmtccmd.tc import QueueHelper
|
|
||||||
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data
|
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data
|
||||||
|
|
||||||
from common_tmtc.pus_tc import command_data as cmd_data
|
from common_tmtc.pus_tc import command_data as cmd_data
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
|
||||||
def pack_service_2_commands_into(q: QueueHelper, op_code: str):
|
def pack_service_2_commands_into(q: DefaultPusQueueHelper, op_code: str):
|
||||||
if op_code == "0":
|
if op_code == "0":
|
||||||
pack_generic_service_2_test_into(0, q)
|
pack_generic_service_2_test_into(0, q)
|
||||||
else:
|
else:
|
||||||
print(f"pack_service_2_test: Operation code {op_code} unknown!")
|
print(f"pack_service_2_test: Operation code {op_code} unknown!")
|
||||||
|
|
||||||
|
|
||||||
def pack_generic_service_2_test_into(init_ssc: int, q: QueueHelper) -> int:
|
def pack_generic_service_2_test_into(init_ssc: int, q: DefaultPusQueueHelper) -> int:
|
||||||
new_ssc = init_ssc
|
new_ssc = init_ssc
|
||||||
object_id = TEST_DEVICE_0_ID # dummy device
|
object_id = TEST_DEVICE_0_ID # dummy device
|
||||||
# Set Raw Mode
|
# Set Raw Mode
|
||||||
|
@ -4,7 +4,6 @@ from spacepackets.ecss.tc import PusTelecommand
|
|||||||
|
|
||||||
from deps.spacepackets.spacepackets.ecss import PusServices
|
from deps.spacepackets.spacepackets.ecss import PusServices
|
||||||
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
|
from tmtccmd.config import TmTcDefWrapper, OpCodeEntry
|
||||||
from tmtccmd.tc import QueueHelper
|
|
||||||
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
||||||
from tmtccmd.tc.pus_20_params import (
|
from tmtccmd.tc.pus_20_params import (
|
||||||
pack_boolean_parameter_app_data,
|
pack_boolean_parameter_app_data,
|
||||||
@ -18,6 +17,8 @@ from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
|
|||||||
|
|
||||||
|
|
||||||
# Set IDs
|
# Set IDs
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
TEST_SET_ID = 0
|
TEST_SET_ID = 0
|
||||||
|
|
||||||
# Action IDs
|
# Action IDs
|
||||||
@ -33,11 +34,11 @@ def add_hk_cmds(defs: TmTcDefWrapper):
|
|||||||
defs.add_service(
|
defs.add_service(
|
||||||
name=str(PusServices.S3_HOUSEKEEPING.value),
|
name=str(PusServices.S3_HOUSEKEEPING.value),
|
||||||
info="PUS Service 3 Housekeeping",
|
info="PUS Service 3 Housekeeping",
|
||||||
op_code_entry=op_code_entry
|
op_code_entry=op_code_entry,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def pack_service_3_commands_into(q: QueueHelper, op_code: str):
|
def pack_service_3_commands_into(q: DefaultPusQueueHelper, op_code: str):
|
||||||
current_ssc = 3000
|
current_ssc = 3000
|
||||||
device_idx = 0
|
device_idx = 0
|
||||||
if device_idx == 0:
|
if device_idx == 0:
|
||||||
@ -67,7 +68,9 @@ def pack_service_3_commands_into(q: QueueHelper, op_code: str):
|
|||||||
pack_notification_basic_test(q=q, object_id=object_id)
|
pack_notification_basic_test(q=q, object_id=object_id)
|
||||||
|
|
||||||
|
|
||||||
def pack_service_3_test_info(q: QueueHelper, device_idx: int, object_id: bytearray):
|
def pack_service_3_test_info(
|
||||||
|
q: DefaultPusQueueHelper, device_idx: int, object_id: bytearray
|
||||||
|
):
|
||||||
q.add_log_cmd("Service 3 (Housekeeping Service): All tests")
|
q.add_log_cmd("Service 3 (Housekeeping Service): All tests")
|
||||||
pack_gen_one_hk_command(q=q, device_idx=device_idx, object_id=object_id)
|
pack_gen_one_hk_command(q=q, device_idx=device_idx, object_id=object_id)
|
||||||
pack_housekeeping_basic_test(q=q, object_id=object_id)
|
pack_housekeeping_basic_test(q=q, object_id=object_id)
|
||||||
@ -78,7 +81,9 @@ def pack_service_3_test_info(q: QueueHelper, device_idx: int, object_id: bytearr
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def pack_gen_one_hk_command(q: QueueHelper, device_idx: int, object_id: bytearray):
|
def pack_gen_one_hk_command(
|
||||||
|
q: DefaultPusQueueHelper, device_idx: int, object_id: bytearray
|
||||||
|
):
|
||||||
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
|
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
|
||||||
q.add_log_cmd(
|
q.add_log_cmd(
|
||||||
f"Service 3 Test: Generate one test set packet for test device {device_idx}"
|
f"Service 3 Test: Generate one test set packet for test device {device_idx}"
|
||||||
@ -87,7 +92,7 @@ def pack_gen_one_hk_command(q: QueueHelper, device_idx: int, object_id: bytearra
|
|||||||
|
|
||||||
|
|
||||||
def pack_housekeeping_basic_test(
|
def pack_housekeeping_basic_test(
|
||||||
q: QueueHelper,
|
q: DefaultPusQueueHelper,
|
||||||
object_id: bytearray,
|
object_id: bytearray,
|
||||||
enable_normal_mode: bool = True,
|
enable_normal_mode: bool = True,
|
||||||
):
|
):
|
||||||
@ -147,7 +152,7 @@ def pack_housekeeping_basic_test(
|
|||||||
|
|
||||||
|
|
||||||
def pack_notification_basic_test(
|
def pack_notification_basic_test(
|
||||||
q: QueueHelper,
|
q: DefaultPusQueueHelper,
|
||||||
object_id: bytearray,
|
object_id: bytearray,
|
||||||
enable_normal_mode: bool = True,
|
enable_normal_mode: bool = True,
|
||||||
):
|
):
|
||||||
|
@ -1,21 +1,21 @@
|
|||||||
from spacepackets.ecss.tc import PusTelecommand
|
from spacepackets.ecss.tc import PusTelecommand
|
||||||
from tmtccmd.tc import QueueHelper
|
|
||||||
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
|
||||||
|
|
||||||
import common_tmtc.pus_tc.command_data as cmd_data
|
import common_tmtc.pus_tc.command_data as cmd_data
|
||||||
|
|
||||||
|
|
||||||
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
|
||||||
|
from tmtccmd.tc.queue import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
|
||||||
def pack_service_8_commands_into(q: QueueHelper, op_code: str):
|
def pack_service_8_commands_into(q: DefaultPusQueueHelper, op_code: str):
|
||||||
if op_code == "0":
|
if op_code == "0":
|
||||||
pack_generic_service_8_test_into(q=q)
|
pack_generic_service_8_test_into(q=q)
|
||||||
else:
|
else:
|
||||||
print(f"pack_service_8_test: Operation code {op_code} unknown!")
|
print(f"pack_service_8_test: Operation code {op_code} unknown!")
|
||||||
|
|
||||||
|
|
||||||
def pack_generic_service_8_test_into(q: QueueHelper):
|
def pack_generic_service_8_test_into(q: DefaultPusQueueHelper):
|
||||||
q.add_log_cmd("Testing Service 8")
|
q.add_log_cmd("Testing Service 8")
|
||||||
object_id = TEST_DEVICE_0_ID
|
object_id = TEST_DEVICE_0_ID
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user