213 lines
7.3 KiB
Python
213 lines
7.3 KiB
Python
import struct
|
|
import time
|
|
|
|
from spacepackets.ecss.tc import PusTelecommand
|
|
|
|
from tmtccmd.config import (
|
|
QueueCommands,
|
|
ServiceOpCodeDictT,
|
|
add_op_code_entry,
|
|
generate_op_code_options,
|
|
add_service_op_code_entry,
|
|
)
|
|
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
|
|
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
|
|
from tmtccmd.tc.pus_11_tc_sched import (
|
|
generate_enable_tc_sched_cmd,
|
|
generate_disable_tc_sched_cmd,
|
|
generate_reset_tc_sched_cmd,
|
|
)
|
|
from tmtccmd.tc.definitions import TcQueueT
|
|
|
|
|
|
class OpCodes:
|
|
ENABLE = ["0", "enable"]
|
|
DISABLE = ["1", "disable"]
|
|
RESET = ["2", "reset"]
|
|
|
|
INSERT = ["12", "test-insert"]
|
|
DELETE = ["13", "del-test"]
|
|
|
|
|
|
class Info:
|
|
ENABLE = "Enable TC scheduling"
|
|
DISABLE = "Disable TC scheduling"
|
|
RESET = "Reset TC scheduling"
|
|
INSERT = "Test TC scheduling insertion"
|
|
DELETE = "Test TC scheduling deletion"
|
|
|
|
|
|
def add_tc_sched_cmds(cmd_dict: ServiceOpCodeDictT):
|
|
op_code = dict()
|
|
add_op_code_entry(
|
|
op_code_dict=op_code,
|
|
keys=OpCodes.ENABLE,
|
|
info=Info.ENABLE,
|
|
options=generate_op_code_options(custom_timeout=2.0),
|
|
)
|
|
add_op_code_entry(
|
|
op_code_dict=op_code,
|
|
keys=OpCodes.DISABLE,
|
|
info=Info.DISABLE,
|
|
options=generate_op_code_options(custom_timeout=2.0),
|
|
)
|
|
add_op_code_entry(op_code_dict=op_code, keys=OpCodes.RESET, info=Info.RESET)
|
|
add_op_code_entry(
|
|
op_code_dict=op_code,
|
|
keys=OpCodes.INSERT,
|
|
info=Info.INSERT,
|
|
options=generate_op_code_options(custom_timeout=0.2),
|
|
)
|
|
add_op_code_entry(
|
|
op_code_dict=op_code,
|
|
keys=OpCodes.DELETE,
|
|
info=Info.DELETE,
|
|
options=generate_op_code_options(custom_timeout=0.2),
|
|
)
|
|
|
|
add_service_op_code_entry(
|
|
srv_op_code_dict=cmd_dict,
|
|
name="11",
|
|
info="PUS Service 11 TC Scheduling",
|
|
op_code_entry=op_code,
|
|
)
|
|
|
|
|
|
def pack_service_11_commands(op_code: str, tc_queue: TcQueueT):
|
|
if op_code in OpCodes.INSERT:
|
|
tc_queue.appendleft(
|
|
(QueueCommands.PRINT, "Testing Time-Tagged Command insertion")
|
|
)
|
|
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple())
|
|
current_time = int(round(time.time()))
|
|
|
|
# these TC[17,1] (ping commands) shall be inserted
|
|
ping_tcs = [
|
|
pack_service_17_ping_command(1701),
|
|
pack_service_17_ping_command(1702),
|
|
pack_service_17_ping_command(1703),
|
|
]
|
|
for idx, tc in enumerate(ping_tcs):
|
|
release_time = current_time + (idx + 2) * 5
|
|
time_tagged_tc = PusTelecommand(
|
|
service=11,
|
|
subservice=Subservices.TC_INSERT,
|
|
app_data=pack_insert_tc_app_data(struct.pack("!I", release_time), tc),
|
|
)
|
|
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
|
|
tc_queue.appendleft((QueueCommands.WAIT, 25))
|
|
if op_code in OpCodes.DELETE:
|
|
tc_queue.appendleft(
|
|
(QueueCommands.PRINT, "Testing Time-Tagged Command deletion")
|
|
)
|
|
current_time = int(round(time.time()))
|
|
tc_to_schedule = pack_service_17_ping_command(1703)
|
|
time_tagged_tc = PusTelecommand(
|
|
service=11,
|
|
subservice=Subservices.TC_INSERT,
|
|
app_data=pack_insert_tc_app_data(
|
|
struct.pack("!I", current_time + 20), tc_to_schedule
|
|
),
|
|
)
|
|
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
|
|
del_time_tagged_tcs = PusTelecommand(
|
|
service=11,
|
|
subservice=Subservices.TC_DELETE,
|
|
app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule),
|
|
ssc=1105,
|
|
)
|
|
tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple())
|
|
if op_code in OpCodes.ENABLE:
|
|
tc_queue.appendleft((QueueCommands.PRINT, "Enabling TC scheduler"))
|
|
tc_queue.appendleft(generate_enable_tc_sched_cmd(ssc=0).pack_command_tuple())
|
|
if op_code in OpCodes.DISABLE:
|
|
tc_queue.appendleft((QueueCommands.PRINT, "Disabling TC scheduler"))
|
|
tc_queue.appendleft(generate_disable_tc_sched_cmd(ssc=0).pack_command_tuple())
|
|
if op_code in OpCodes.RESET:
|
|
tc_queue.appendleft((QueueCommands.PRINT, "Reset TC scheduler"))
|
|
tc_queue.appendleft(generate_reset_tc_sched_cmd(ssc=0).pack_command_tuple())
|
|
# a TC[11,5] for 3rd inserted ping TC
|
|
# TODO: This should be an independent test
|
|
# a TC[11,6] for some other previously inserted TCs
|
|
# service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
|
|
# current_time+45, current_time+55)
|
|
# tc_queue.appendleft(service_11_6_tc.pack_command_tuple())
|
|
|
|
# TODO: This should be an independent test
|
|
# a TC[11,7] for another previously inserted TC
|
|
# service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4)
|
|
# tc_queue.appendleft(service_11_7_tc.pack_command_tuple())
|
|
|
|
# TODO: This should be an independent test
|
|
# a TC[11,8] with offset time of 20s
|
|
# service_11_8_tc = build_filter_timeshift_tc(
|
|
# 20,
|
|
# TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
|
|
# current_time + 45,
|
|
# current_time + 55,
|
|
# )
|
|
# tc_queue.appendleft((service_11_8_tc.pack_command_tuple()))
|
|
|
|
|
|
# this function packs another TC into an insert activity TC[11,4]
|
|
# parameter: release_time: Absolute time when TC shall be released/run
|
|
# parameter tc_to_insert: The TC which shall be inserted
|
|
def pack_insert_tc_app_data(release_time: bytes, tc_to_insert: PusTelecommand) -> bytes:
|
|
app_data = bytearray()
|
|
# pack the release time
|
|
app_data.extend(release_time)
|
|
# followed by the tc
|
|
app_data.extend(tc_to_insert.pack())
|
|
return app_data
|
|
|
|
|
|
def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes:
|
|
return TcSchedReqId.build_from_tc(tc).pack()
|
|
|
|
|
|
def build_filter_delete_tc(
|
|
time_window_type: TypeOfTimeWindow, *timestamps: int
|
|
) -> PusTelecommand:
|
|
app_data = bytearray()
|
|
app_data.extend(struct.pack("!I", int(time_window_type)))
|
|
|
|
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
|
|
for timestamp in timestamps:
|
|
app_data.extend(struct.pack("!I", timestamp))
|
|
|
|
return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161)
|
|
|
|
|
|
def pack_corresponding_timeshift_app_data(
|
|
time_delta: bytes, tc: PusTelecommand, ssc: int
|
|
) -> bytes:
|
|
req_id = TcSchedReqId.build_from_tc(tc)
|
|
app_data = bytearray()
|
|
app_data.extend(time_delta)
|
|
app_data.extend(req_id.pack())
|
|
return app_data
|
|
|
|
|
|
def build_filter_timeshift_tc(
|
|
time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int
|
|
) -> PusTelecommand:
|
|
app_data = bytearray()
|
|
app_data.extend(struct.pack("!I", time_offset))
|
|
app_data.extend(struct.pack("!I", int(time_window_type)))
|
|
|
|
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
|
|
for timestamp in timestamps:
|
|
app_data.extend(struct.pack("!I", timestamp))
|
|
|
|
return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181)
|
|
|
|
|
|
# waits for a specified amount of seconds and prints ". . ." for each second
|
|
def wait_seconds(t: int):
|
|
print("Waiting: ", end="")
|
|
for x in range(t):
|
|
time.sleep(1)
|
|
print(". ", end="")
|
|
|
|
print("")
|