update code to new tmtccmd version

This commit is contained in:
2022-07-03 20:58:32 +02:00
parent 4cc02d287d
commit d87e2971b4
13 changed files with 459 additions and 537 deletions

View File

@ -1,32 +1,22 @@
from tmtccmd.config import (
ServiceOpCodeDictT,
add_op_code_entry,
add_service_op_code_entry,
generate_op_code_options,
OpCodeDictKeys,
TmTcDefWrapper,
)
from tmtccmd.config.definitions import OpCodeOptionsT
from tmtccmd.config.globals import get_default_service_op_code_dict
from common_tmtc.pus_tc.pus_11_tc_sched import OpCodes, add_tc_sched_cmds
from common_tmtc.pus_tc.pus_11_tc_sched import add_tc_sched_cmds
from tmtccmd.config.globals import get_default_tmtc_defs
from tmtccmd.config.tmtc import OpCodeEntry
def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT:
service_op_code_dict = get_default_service_op_code_dict()
op_code = dict()
add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test")
add_op_code_entry(
op_code_dict=op_code,
keys=["0", "asm_to_normal"],
info="Command test assembly to normal mode",
options={OpCodeDictKeys.TIMEOUT: 6.0},
def common_fsfw_service_op_code_dict() -> TmTcDefWrapper:
def_wrapper = get_default_tmtc_defs()
op_code_entry = OpCodeEntry()
op_code_entry.add(keys="test", info="Mode CMD Test")
op_code_entry.add(
keys=["0", "asm_to_normal"], info="Command test assembly to normal mode"
)
add_service_op_code_entry(
srv_op_code_dict=service_op_code_dict,
name="200",
info="PUS Service 200 Mode MGMT",
op_code_entry=op_code,
def_wrapper.add_service(
"200", info="PUS Service 200 Mode MGMT", op_code_entry=op_code_entry
)
add_tc_sched_cmds(service_op_code_dict)
add_tc_sched_cmds(def_wrapper)
return service_op_code_dict
return def_wrapper

View File

@ -1,24 +1,20 @@
import struct
import time
from datetime import timedelta
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config import (
QueueCommands,
ServiceOpCodeDictT,
add_op_code_entry,
generate_op_code_options,
add_service_op_code_entry,
)
from tmtccmd.config import TmTcDefWrapper
from tmtccmd.config.tmtc import OpCodeEntry
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_11_tc_sched import (
generate_enable_tc_sched_cmd,
generate_disable_tc_sched_cmd,
generate_reset_tc_sched_cmd,
pack_time_tagged_tc_app_data,
)
from tmtccmd.tc.definitions import TcQueueT
class OpCodes:
@ -40,106 +36,80 @@ class Info:
TEST_RESET = "Test TC scheduling reset command"
def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT):
op_code = dict()
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.ENABLE,
info=Info.ENABLE,
options=generate_op_code_options(custom_timeout=2.0),
)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.DISABLE,
info=Info.DISABLE,
options=generate_op_code_options(custom_timeout=2.0),
)
add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.TEST_INSERT,
info=Info.TEST_INSERT,
options=generate_op_code_options(custom_timeout=0.2),
)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.TEST_DELETE,
info=Info.TEST_DELETE,
options=generate_op_code_options(custom_timeout=0.2),
)
add_op_code_entry(
op_code_dict=op_code,
keys=OpCodes.TEST_RESET,
info=Info.TEST_RESET,
options=generate_op_code_options(custom_timeout=0.2),
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name="11",
info="PUS Service 11 TC Scheduling",
op_code_entry=op_code,
def add_tc_sched_cmds(cmd_dict: TmTcDefWrapper):
op_code_entry = OpCodeEntry()
op_code_entry.add(keys=OpCodes.ENABLE, info=Info.ENABLE)
op_code_entry.add(keys=OpCodes.DISABLE, info=Info.DISABLE)
op_code_entry.add(keys=OpCodes.RESET, info=Info.RESET)
op_code_entry.add(keys=OpCodes.TEST_INSERT, info=Info.TEST_INSERT)
op_code_entry.add(keys=OpCodes.TEST_DELETE, info=Info.TEST_DELETE)
op_code_entry.add(keys=OpCodes.TEST_RESET, info=Info.TEST_RESET)
cmd_dict.add_service(
"11", info="PUS Service 11 TC Scheduling", op_code_entry=op_code_entry
)
def __generic_pack_three_time_tagged_cmds(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Time-Tagged Command insertion"))
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple())
def __generic_pack_three_time_tagged_cmds(q: QueueHelper):
q.add_log_cmd("Testing Time-Tagged Command insertion")
q.add_pus_tc(generate_enable_tc_sched_cmd())
current_time = int(round(time.time()))
# these TC[17,1] (ping commands) shall be inserted
ping_tcs = [
pack_service_17_ping_command(1701),
pack_service_17_ping_command(1702),
pack_service_17_ping_command(1703),
pack_service_17_ping_command(),
pack_service_17_ping_command(),
pack_service_17_ping_command(),
]
for idx, tc in enumerate(ping_tcs):
release_time = current_time + (idx + 2) * 5
time_tagged_tc = PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(struct.pack("!I", release_time), tc),
q.add_pus_tc(
PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(
struct.pack("!I", release_time), tc
),
)
)
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 25))
q.add_wait(timedelta(seconds=25.0))
def pack_service_11_commands(op_code: str, tc_queue: TcQueueT):
def pack_service_11_commands(op_code: str, q: QueueHelper):
if op_code in OpCodes.TEST_INSERT:
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Time-Tagged Command deletion")
)
__generic_pack_three_time_tagged_cmds(tc_queue=tc_queue)
q.add_log_cmd("Testing Time-Tagged Command deletion")
__generic_pack_three_time_tagged_cmds(q=q)
if op_code in OpCodes.TEST_DELETE:
current_time = int(round(time.time()))
tc_to_schedule = pack_service_17_ping_command(1703)
time_tagged_tc = PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(
struct.pack("!I", current_time + 20), tc_to_schedule
),
tc_to_schedule = pack_service_17_ping_command()
q.add_pus_tc(
PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_time_tagged_tc_app_data(
struct.pack("!I", current_time + 20), tc_to_schedule
),
)
)
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
del_time_tagged_tcs = PusTelecommand(
service=11,
subservice=Subservices.TC_DELETE,
app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule),
ssc=1105,
q.add_pus_tc(
PusTelecommand(
service=11,
subservice=Subservices.TC_DELETE,
app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule),
)
)
tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple())
if op_code in OpCodes.ENABLE:
tc_queue.appendleft((QueueCommands.PRINT, "Enabling TC scheduler"))
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple())
q.add_log_cmd("Enabling TC scheduler")
q.add_pus_tc(generate_enable_tc_sched_cmd())
if op_code in OpCodes.DISABLE:
tc_queue.appendleft((QueueCommands.PRINT, "Disabling TC scheduler"))
tc_queue.appendleft(generate_disable_tc_sched_cmd(ssc=0).pack_command_tuple())
q.add_log_cmd("Disabling TC scheduler")
q.add_pus_tc(generate_disable_tc_sched_cmd())
if op_code in OpCodes.RESET:
tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler"))
tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple())
q.add_log_cmd("Reset TC scheduler")
q.add_pus_tc(generate_reset_tc_sched_cmd())
if op_code in OpCodes.TEST_RESET:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Reset command"))
__generic_pack_three_time_tagged_cmds(tc_queue=tc_queue)
tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple())
q.add_log_cmd("Testing Reset command")
__generic_pack_three_time_tagged_cmds(q)
q.add_pus_tc(generate_reset_tc_sched_cmd())
# a TC[11,5] for 3rd inserted ping TC
# TODO: This should be an independent test
# a TC[11,6] for some other previously inserted TCs
@ -177,7 +147,7 @@ def build_filter_delete_tc(
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161)
return PusTelecommand(service=11, subservice=6, app_data=app_data)
def pack_corresponding_timeshift_app_data(
@ -201,7 +171,7 @@ def build_filter_timeshift_tc(
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181)
return PusTelecommand(service=11, subservice=8, app_data=app_data)
# waits for a specified amount of seconds and prints ". . ." for each second

View File

@ -1,14 +1,12 @@
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.pus.pus_17_test import (
pack_service_17_ping_command,
pack_generic_service17_test,
)
from tmtccmd.tc import QueueHelper
def pack_service_17_commands(op_code: str, init_ssc: int, tc_queue: TcQueueT):
def pack_service_17_commands(op_code: str, q: QueueHelper):
if op_code == "0":
tc_queue.appendleft(
pack_service_17_ping_command(ssc=init_ssc).pack_command_tuple()
)
q.add_pus_tc(pack_service_17_ping_command())
else:
pack_generic_service17_test(tc_queue=tc_queue, init_ssc=init_ssc)
pack_generic_service17_test(q=q)

View File

@ -1,54 +1,40 @@
# -*- coding: utf-8 -*-
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, ASSEMBLY_ID
def pack_service_200_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service_200_commands_into(q: QueueHelper, op_code: str):
if op_code == "test":
pack_service_200_test_into(tc_queue=tc_queue, init_ssc=2000)
pack_service_200_test_into(q)
elif op_code == "asm_to_normal" or op_code == "0":
# Set Normal mode
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 200: Set Mode Normal")
)
q.add_log_cmd("Testing Service 200: Set Mode Normal")
# Command assembly to normal, submode 1 for dual mode,
mode_data = pack_mode_data(ASSEMBLY_ID, Modes.NORMAL, 1)
command = PusTelecommand(service=200, subservice=1, ssc=0, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
def pack_service_200_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
new_ssc = init_ssc
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200"))
def pack_service_200_test_into(q: QueueHelper):
q.add_log_cmd("Testing Service 200")
# Object ID: DUMMY Device
object_id = TEST_DEVICE_0_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
q.add_log_cmd("Testing Service 200: Set Mode On")
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
q.add_log_cmd("Testing Service 200: Set Mode Normal")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
q.add_log_cmd("Testing Service 200: Set Mode Raw")
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
q.add_log_cmd("Testing Service 200: Set Mode Off")
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
return new_ssc
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -2,8 +2,7 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_20_params import pack_type_and_matrix_data, pack_parameter_id
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger
@ -13,73 +12,67 @@ from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
LOGGER = get_console_logger()
def pack_service20_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service20_commands_into(q: QueueHelper, op_code: str):
if op_code == "0":
pack_service20_test_into(tc_queue=tc_queue)
pack_service20_test_into(q=q)
def pack_service20_test_into(tc_queue: TcQueueT, called_externally: bool = False):
def pack_service20_test_into(q: QueueHelper, called_externally: bool = False):
if called_externally is False:
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20"))
q.add_log_cmd("Testing Service 20")
object_id = TEST_DEVICE_0_ID
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Set Normal Mode"))
q.add_log_cmd("Testing Service 20: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
load_param_0_simple_test_commands(tc_queue=tc_queue)
load_param_1_simple_test_commands(tc_queue=tc_queue)
load_param_2_simple_test_commands(tc_queue=tc_queue)
load_param_0_simple_test_commands(q)
load_param_1_simple_test_commands(q)
load_param_2_simple_test_commands(q)
def load_param_0_simple_test_commands(tc_queue: TcQueueT):
def load_param_0_simple_test_commands(q: QueueHelper):
object_id = TEST_DEVICE_0_ID
parameter_id_0 = pack_parameter_id(domain_id=0, unique_id=0, linear_index=0)
# test checking Load for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load uint32_t"))
q.add_log_cmd("Testing Service 20: Load uint32_t")
type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1)
parameter_data = struct.pack("!I", 42)
payload = object_id + parameter_id_0 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload))
# test checking Dump for uint32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump uint32_t"))
q.add_log_cmd("Testing Service 20: Dump uint32_t")
payload = object_id + parameter_id_0
command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
def load_param_1_simple_test_commands(tc_queue: TcQueueT):
def load_param_1_simple_test_commands(q: QueueHelper):
object_id = TEST_DEVICE_0_ID
parameter_id_1 = pack_parameter_id(domain_id=0, unique_id=1, linear_index=0)
# test checking Load for int32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load int32_t"))
q.add_log_cmd("Testing Service 20: Load int32_t")
type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1)
parameter_data = struct.pack("!i", -42)
payload = object_id + parameter_id_1 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload))
# test checking Dump for int32_t
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Dump int32_t"))
q.add_log_cmd("Testing Service 20: Dump int32_t")
payload = object_id + parameter_id_1
command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=129, app_data=payload))
def load_param_2_simple_test_commands(tc_queue: TcQueueT):
def load_param_2_simple_test_commands(q: QueueHelper):
object_id = TEST_DEVICE_0_ID
parameter_id_2 = pack_parameter_id(domain_id=0, unique_id=2, linear_index=0)
# test checking Load for float
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 20: Load float"))
q.add_log_cmd("Testing Service 20: Load float")
type_and_matrix_data = pack_type_and_matrix_data(ptc=5, pfc=1, rows=1, columns=3)
parameter_data = struct.pack("!fff", 4.2, -4.2, 49)
payload = object_id + parameter_id_2 + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=20, subservice=128, app_data=payload))
# test checking Dump for float
# Skip dump for now, still not properly implemented

View File

@ -9,73 +9,51 @@ import struct
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import Modes, pack_mode_data
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from common_tmtc.pus_tc import command_data as cmd_data
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
def pack_service_2_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service_2_commands_into(q: QueueHelper, op_code: str):
if op_code == "0":
pack_generic_service_2_test_into(0, tc_queue)
pack_generic_service_2_test_into(0, q)
else:
print(f"pack_service_2_test: Operation code {op_code} unknown!")
def pack_generic_service_2_test_into(init_ssc: int, tc_queue: TcQueueT) -> int:
def pack_generic_service_2_test_into(init_ssc: int, q: QueueHelper) -> int:
new_ssc = init_ssc
object_id = TEST_DEVICE_0_ID # dummy device
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Raw Mode"))
q.add_log_cmd("Testing Service 2: Setting Raw Mode")
mode_data = pack_mode_data(object_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# toggle wiretapping raw
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggling Wiretapping Raw")
)
q.add_log_cmd("Testing Service 2: Toggling Wiretapping Raw")
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1)
toggle_wiretapping_on_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
q.add_pus_tc(
PusTelecommand(service=2, subservice=129, app_data=wiretapping_toggle_data)
)
tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple())
new_ssc += 1
# send raw command, wiretapping should be returned via TM[2,130] and TC[2,131]
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Sending Raw Command"))
q.add_log_cmd("Testing Service 2: Sending Raw Command")
raw_command = cmd_data.TEST_COMMAND_0
raw_data = object_id + raw_command
raw_command = PusTelecommand(
service=2, subservice=128, ssc=new_ssc, app_data=raw_data
)
tc_queue.appendleft(raw_command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=2, subservice=128, app_data=raw_data))
# toggle wiretapping off
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Toggle Wiretapping Off")
)
q.add_log_cmd("Testing Service 2: Toggle Wiretapping Off")
wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0)
toggle_wiretapping_off_command = PusTelecommand(
service=2, subservice=129, ssc=new_ssc, app_data=wiretapping_toggle_data
q.add_pus_tc(
PusTelecommand(service=2, subservice=129, app_data=wiretapping_toggle_data)
)
tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple())
new_ssc += 1
# send raw command which should be returned via TM[2,130]
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 2: Send second raw command")
)
command = PusTelecommand(service=2, subservice=128, ssc=new_ssc, app_data=raw_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_log_cmd("Testing Service 2: Send second raw command")
q.add_pus_tc(PusTelecommand(service=2, subservice=128, app_data=raw_data))
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 2: Setting Off Mode"))
q.add_log_cmd("Testing Service 2: Setting Off Mode")
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=new_ssc, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
new_ssc += 1
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
return new_ssc

View File

@ -1,6 +1,8 @@
from datetime import timedelta
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.tc.pus_20_params import (
pack_boolean_parameter_app_data,
@ -8,7 +10,6 @@ from tmtccmd.tc.pus_20_params import (
)
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
import tmtccmd.tc.pus_3_fsfw_hk as srv3
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID, TEST_DEVICE_1_ID
@ -24,7 +25,7 @@ TEST_NOTIFICATION_ACTION_ID = 3
PARAM_ACTIVATE_CHANGING_DATASETS = 4
def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service_3_commands_into(q: QueueHelper, op_code: str):
current_ssc = 3000
device_idx = 0
if device_idx == 0:
@ -35,179 +36,120 @@ def pack_service_3_commands_into(tc_queue: TcQueueT, op_code: str):
if op_code == "0":
# This will pack all the tests
pack_service_3_test_info(
tc_queue=tc_queue,
init_ssc=current_ssc,
q=q,
object_id=object_id,
device_idx=device_idx,
)
elif op_code == "1":
# Extremely simple, generate one HK packet
pack_gen_one_hk_command(
tc_queue=tc_queue,
q=q,
device_idx=device_idx,
init_ssc=current_ssc,
object_id=object_id,
)
elif op_code == "2":
# Housekeeping basic test
pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
pack_housekeeping_basic_test(q=q, object_id=object_id)
elif op_code == "3":
# Notification demo
pack_notification_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
pack_notification_basic_test(q=q, object_id=object_id)
def pack_service_3_test_info(
tc_queue: TcQueueT, device_idx: int, object_id: bytearray, init_ssc: int
):
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 (Housekeeping Service): All tests")
)
current_ssc = init_ssc
current_ssc += pack_gen_one_hk_command(
tc_queue=tc_queue,
device_idx=device_idx,
def pack_service_3_test_info(q: QueueHelper, device_idx: int, object_id: bytearray):
q.add_log_cmd("Service 3 (Housekeeping Service): All tests")
pack_gen_one_hk_command(q=q, device_idx=device_idx, object_id=object_id)
pack_housekeeping_basic_test(q=q, object_id=object_id)
pack_notification_basic_test(
q=q,
object_id=object_id,
init_ssc=current_ssc,
)
current_ssc += pack_housekeeping_basic_test(
tc_queue=tc_queue, object_id=object_id, init_ssc=current_ssc
)
current_ssc += pack_notification_basic_test(
tc_queue=tc_queue,
object_id=object_id,
init_ssc=current_ssc,
enable_normal_mode=False,
)
def pack_gen_one_hk_command(
tc_queue: TcQueueT, device_idx: int, init_ssc: int, object_id: bytearray
) -> int:
def pack_gen_one_hk_command(q: QueueHelper, device_idx: int, object_id: bytearray):
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
tc_queue.appendleft(
(
QueueCommands.PRINT,
f"Service 3 Test: Generate one test set packet for "
f"test device {device_idx}",
)
q.add_log_cmd(
f"Service 3 Test: Generate one test set packet for test device {device_idx}"
)
command = generate_one_hk_command(ssc=init_ssc, sid=test_sid)
init_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
return init_ssc
q.add_pus_tc(generate_one_hk_command(sid=test_sid))
def pack_housekeeping_basic_test(
tc_queue: TcQueueT,
q: QueueHelper,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
):
"""
This basic test will request one HK packet, then it will enable periodic packets and listen
to the periodic packets for a few seconds. After that, HK packets will be disabled again.
"""
test_sid = make_sid(object_id=object_id, set_id=TEST_SET_ID)
current_ssc = init_ssc
# Enable changing datasets via parameter service (Service 20)
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing basic HK tests")
)
q.add_log_cmd("Service 3 Test: Performing basic HK tests")
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
q.add_log_cmd("Service 3 Test: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
tc_queue.appendleft((QueueCommands.PRINT, "Enabling changing datasets"))
q.add_log_cmd("Enabling changing datasets")
app_data = pack_boolean_parameter_app_data(
object_id=object_id,
domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=True,
)
cmd = pack_fsfw_load_param_cmd(app_data=app_data, ssc=0)
current_ssc += 1
tc_queue.appendleft(cmd.pack_command_tuple())
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=app_data, ssc=0))
# Enable periodic reporting
tc_queue.appendleft(
(QueueCommands.PRINT, "Enabling periodic thermal sensor packet generation: ")
q.add_log_cmd("Enabling periodic thermal sensor packet generation: ")
q.add_pus_tc(
PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_ENABLE_PERIODIC_HK_GEN,
app_data=test_sid,
)
)
command = PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_ENABLE_PERIODIC_HK_GEN,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.WAIT, 2.0))
q.add_wait(timedelta(seconds=2.0))
# Disable periodic reporting
tc_queue.appendleft(
(QueueCommands.PRINT, "Disabling periodic thermal sensor packet generation: ")
q.add_log_cmd("Disabling periodic thermal sensor packet generation: ")
q.add_pus_tc(
PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_DISABLE_PERIODIC_HK_GEN,
app_data=test_sid,
)
)
command = PusTelecommand(
service=3,
subservice=srv3.Subservices.TC_DISABLE_PERIODIC_HK_GEN,
ssc=current_ssc,
app_data=test_sid,
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
# Disable changing datasets via parameter service (Service 20)
tc_queue.appendleft((QueueCommands.PRINT, "Disabling changing datasets"))
q.add_log_cmd("Disabling changing datasets")
app_data = pack_boolean_parameter_app_data(
object_id=object_id,
domain_id=0,
unique_id=PARAM_ACTIVATE_CHANGING_DATASETS,
parameter=False,
)
current_ssc += 1
cmd = pack_fsfw_load_param_cmd(app_data=app_data, ssc=0)
tc_queue.appendleft(cmd.pack_command_tuple())
return current_ssc
q.add_pus_tc(pack_fsfw_load_param_cmd(app_data=app_data, ssc=0))
def pack_notification_basic_test(
tc_queue: TcQueueT,
q: QueueHelper,
object_id: bytearray,
init_ssc: int,
enable_normal_mode: bool = True,
) -> int:
current_ssc = init_ssc
tc_queue.appendleft(
(QueueCommands.PRINT, "Service 3 Test: Performing notification tests")
)
):
q.add_log_cmd("Service 3 Test: Performing notification tests")
if enable_normal_mode:
# Set mode normal so that sets are changed/read regularly
tc_queue.appendleft((QueueCommands.PRINT, "Service 3 Test: Set Normal Mode"))
q.add_log_cmd("Service 3 Test: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(
service=200, subservice=1, ssc=current_ssc, app_data=mode_data
)
current_ssc += 1
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
tc_queue.appendleft((QueueCommands.PRINT, "Triggering notification"))
command = generate_action_command(
object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID, ssc=current_ssc
q.add_log_cmd("Triggering notification")
q.add_pus_tc(
generate_action_command(
object_id=object_id, action_id=TEST_NOTIFICATION_ACTION_ID
)
)
tc_queue.appendleft(command.pack_command_tuple())
current_ssc += 1
return current_ssc

View File

@ -1,7 +1,5 @@
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc import QueueHelper
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
import common_tmtc.pus_tc.command_data as cmd_data
@ -10,53 +8,42 @@ import common_tmtc.pus_tc.command_data as cmd_data
from common_tmtc.config.object_ids import TEST_DEVICE_0_ID
def pack_service_8_commands_into(tc_queue: TcQueueT, op_code: str):
def pack_service_8_commands_into(q: QueueHelper, op_code: str):
if op_code == "0":
pack_generic_service_8_test_into(tc_queue=tc_queue)
pack_generic_service_8_test_into(q=q)
else:
print(f"pack_service_8_test: Operation code {op_code} unknown!")
def pack_generic_service_8_test_into(tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8"))
def pack_generic_service_8_test_into(q: QueueHelper):
q.add_log_cmd("Testing Service 8")
object_id = TEST_DEVICE_0_ID
# set mode on
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set On Mode"))
q.add_log_cmd("Testing Service 8: Set On Mode")
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# set mode normal
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Normal Mode"))
q.add_log_cmd("Testing Service 8: Set Normal Mode")
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
# Direct command which triggers completion reply
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Service 8: Trigger Step and Completion Reply")
)
q.add_log_cmd("Testing Service 8: Trigger Step and Completion Reply")
action_id = cmd_data.TEST_COMMAND_0
direct_command = object_id + action_id
command = PusTelecommand(
service=8, subservice=128, ssc=820, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=direct_command))
# Direct command which triggers _tm_data reply
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Trigger Data Reply"))
q.add_log_cmd("Testing Service 8: Trigger Data Reply")
action_id = cmd_data.TEST_COMMAND_1
command_param1 = cmd_data.TEST_COMMAND_1_PARAM_1
command_param2 = cmd_data.TEST_COMMAND_1_PARAM_2
direct_command = object_id + action_id + command_param1 + command_param2
command = PusTelecommand(
service=8, subservice=128, ssc=830, app_data=direct_command
)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=direct_command))
# Set mode off
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 8: Set Off Mode"))
q.add_log_cmd("Testing Service 8: Set Off Mode")
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))

View File

@ -16,7 +16,7 @@ from tmtccmd.logging import get_console_logger, get_current_time_string
from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.config.definitions import CoreServiceList, QueueCommands
from tmtccmd.tc.pus_5_event import pack_generic_service5_test_into
from tmtccmd.tc.pus_5_event import pack_generic_service_5_test_into
from tmtccmd.pus.pus_17_test import pack_generic_service17_test
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
@ -44,31 +44,6 @@ def pre_tc_send_cb(
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=queue_entry)
elif isinstance(queue_entry, QueueCommands):
if queue_entry == QueueCommands.PRINT:
file_logger.info(queue_info)
def common_service_queue_user(
service: Union[str, int], op_code: str, tc_queue: TcQueueT
):
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_3.value:
return pack_service_3_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_5.value:
return pack_generic_service5_test_into(tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_11.value:
return pack_service_11_commands(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0)
if service == CoreServiceList.SERVICE_20.value:
return pack_service20_commands_into(tc_queue=tc_queue, op_code=op_code)
if service == CoreServiceList.SERVICE_200.value:
return pack_service_200_commands_into(tc_queue=tc_queue, op_code=op_code)
LOGGER.warning("Invalid Service !")
def create_total_tc_queue_user() -> TcQueueT: