cotinued sched service test

This commit is contained in:
Robin Müller 2022-05-20 11:08:46 +02:00
parent 756b300c31
commit 7d197dbe4b
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
7 changed files with 181 additions and 22 deletions

View File

@ -1,24 +1,33 @@
from typing import Dict, Optional
from typing import Optional, Union
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.utility.obj_id import ObjectIdDictT
from tmtccmd.utility.retval import RetvalDictT
from common_tmtc.config.definitions import TM_SP_IDS
from common_tmtc.pus_tc.cmd_definitions import get_fsfw_service_op_code_dict
from common_tmtc.pus_tc.cmd_definitions import common_fsfw_service_op_code_dict
class FsfwHookBase(TmTcHookBase):
class CommonFsfwHookBase(TmTcHookBase):
def pack_service_queue(
self, service: Union[int, str], op_code: str, service_queue: TcQueueT
):
from common_tmtc.pus_tc.tc_packing import common_service_queue_user
common_service_queue_user(
service=service, op_code=op_code, tc_queue=service_queue
)
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
return get_fsfw_service_op_code_dict()
return common_fsfw_service_op_code_dict()
def assign_communication_interface(
self, com_if_key: str
@ -34,14 +43,7 @@ class FsfwHookBase(TmTcHookBase):
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
print("No custom mode operation implemented")
def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from common_tmtc.pus_tc.tc_packing import pack_service_queue_user
pack_service_queue_user(
service=service, op_code=op_code, tc_queue=service_queue
)
def get_object_ids(self) -> Dict[bytes, list]:
def get_object_ids(self) -> ObjectIdDictT:
from common_tmtc.config.object_ids import get_object_ids
return get_object_ids()

View File

@ -7,7 +7,7 @@ from tmtccmd.config import (
from tmtccmd.config.globals import get_default_service_op_code_dict
def get_fsfw_service_op_code_dict() -> ServiceOpCodeDictT:
def common_fsfw_service_op_code_dict() -> ServiceOpCodeDictT:
service_op_code_dict = get_default_service_op_code_dict()
op_code = dict()
add_op_code_entry(op_code_dict=op_code, keys="test", info="Mode CMD Test")
@ -20,7 +20,16 @@ def get_fsfw_service_op_code_dict() -> ServiceOpCodeDictT:
add_service_op_code_entry(
srv_op_code_dict=service_op_code_dict,
name="200",
info="Mode MGMT",
info="PUS Service 200 Mode MGMT",
op_code_entry=op_code,
)
op_code = dict()
add_op_code_entry(op_code_dict=op_code, keys="test", info="TC Scheduling Test")
add_service_op_code_entry(
srv_op_code_dict=service_op_code_dict,
name="11",
info="PUS Service 11 TC Scheduling",
op_code_entry=op_code,
)
return service_op_code_dict

143
pus_tc/pus_11_tc_sched.py Normal file
View File

@ -0,0 +1,143 @@
import struct
import time
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config import QueueCommands
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from tmtccmd.pus.pus_11_tc_sched import TypeOfTimeWindow, Subservices, TcSchedReqId
from tmtccmd.tc.definitions import TcQueueT
class OpCodes:
INSERT = ["0", "insert-test"]
DELETE = ["1", "del-test"]
def pack_service_11_commands(op_code: str, tc_queue: TcQueueT):
if op_code in OpCodes.INSERT:
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Time-Tagged Command insertion")
)
current_time = int(round(time.time()))
# these TC[17,1] (ping commands) shall be inserted
ping_tcs = [
pack_service_17_ping_command(1701),
pack_service_17_ping_command(1702),
pack_service_17_ping_command(1703),
pack_service_17_ping_command(1704),
pack_service_17_ping_command(1705),
]
for idx, tc in enumerate(ping_tcs):
release_time = current_time + 20 + idx * 10
tc = PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_insert_tc_app_data(release_time, tc),
)
tc_queue.appendleft(tc.pack_command_tuple())
if op_code in ["0", "del-test"]:
tc_queue.appendleft(
(QueueCommands.PRINT, "Testing Time-Tagged Command deletion")
)
current_time = int(round(time.time()))
tc_to_schedule = pack_service_17_ping_command(1703)
time_tagged_tc = PusTelecommand(
service=11,
subservice=Subservices.TC_INSERT,
app_data=pack_insert_tc_app_data(current_time + 20, tc_to_schedule),
)
tc_queue.appendleft(time_tagged_tc.pack_command_tuple())
del_time_tagged_tcs = PusTelecommand(
service=11,
subservice=Subservices.TC_DELETE,
app_data=pack_delete_corresponding_tc_app_data(tc=tc_to_schedule),
ssc=1105,
)
# a TC[11,5] for 3rd inserted ping TC
tc_queue.appendleft(del_time_tagged_tcs.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,6] for some other previously inserted TCs
# service_11_6_tc = build_filter_delete_tc(TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
# current_time+45, current_time+55)
# tc_queue.appendleft(service_11_6_tc.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,7] for another previously inserted TC
# service_11_7_tc = build_corresponding_timeshift_tc(30, ping_tc_4)
# tc_queue.appendleft(service_11_7_tc.pack_command_tuple())
# TODO: This should be an independent test
# a TC[11,8] with offset time of 20s
# service_11_8_tc = build_filter_timeshift_tc(
# 20,
# TypeOfTimeWindow.FROM_TIMETAG_TO_TIMETAG,
# current_time + 45,
# current_time + 55,
# )
# tc_queue.appendleft((service_11_8_tc.pack_command_tuple()))
# this function packs another TC into an insert activity TC[11,4]
# parameter: release_time: Absolute time when TC shall be released/run
# parameter tc_to_insert: The TC which shall be inserted
def pack_insert_tc_app_data(release_time: int, tc_to_insert: PusTelecommand) -> bytes:
app_data = bytearray()
# pack the release time
app_data.extend(struct.pack("!I", release_time))
# followed by the tc
app_data.extend(tc_to_insert.pack())
return app_data
def pack_delete_corresponding_tc_app_data(tc: PusTelecommand) -> bytes:
return TcSchedReqId.build_from_tc(tc).pack()
def build_filter_delete_tc(
time_window_type: TypeOfTimeWindow, *timestamps: int
) -> PusTelecommand:
app_data = bytearray()
app_data.extend(struct.pack("!I", int(time_window_type)))
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=6, app_data=app_data, ssc=1161)
def pack_corresponding_timeshift_app_data(
time_delta: bytes, tc: PusTelecommand, ssc: int
) -> bytes:
req_id = TcSchedReqId.build_from_tc(tc)
app_data = bytearray()
app_data.extend(time_delta)
app_data.extend(req_id.pack())
return app_data
def build_filter_timeshift_tc(
time_offset: int, time_window_type: TypeOfTimeWindow, *timestamps: int
) -> PusTelecommand:
app_data = bytearray()
app_data.extend(struct.pack("!I", time_offset))
app_data.extend(struct.pack("!I", int(time_window_type)))
if time_window_type != TypeOfTimeWindow.SELECT_ALL:
for timestamp in timestamps:
app_data.extend(struct.pack("!I", timestamp))
return PusTelecommand(service=11, subservice=8, app_data=app_data, ssc=1181)
# waits for a specified amount of seconds and prints ". . ." for each second
def wait_seconds(t: int):
print("Waiting: ", end="")
for x in range(t):
time.sleep(1)
print(". ", end="")
print("")

View File

@ -9,6 +9,8 @@ from collections import deque
from typing import Union
from spacepackets.ecss.tc import PusTelecommand
from common_tmtc.pus_tc.pus_11_tc_sched import pack_service_11_commands
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.logging import get_console_logger, get_current_time_string
from tmtccmd.logging.pus import log_raw_pus_tc
@ -20,9 +22,9 @@ from tmtccmd.pus.pus_17_test import pack_generic_service17_test
from common_tmtc.pus_tc.service_20_parameters import pack_service20_commands_into
from common_tmtc.pus_tc.service_2_raw_cmd import pack_service_2_commands_into
from common_tmtc.pus_tc.service_3_housekeeping import pack_service_3_commands_into
from common_tmtc.pus_tc.service_17_test import pack_service_17_commands
from common_tmtc.pus_tc.pus_17_test import pack_service_17_commands
from common_tmtc.pus_tc.service_8_func_cmd import pack_service_8_commands_into
from common_tmtc.pus_tc.service_200_mode import pack_service_200_commands_into
from common_tmtc.pus_tc.pus_200_mode import pack_service_200_commands_into
LOGGER = get_console_logger()
@ -47,7 +49,9 @@ def pre_tc_send_cb(
file_logger.info(queue_info)
def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: TcQueueT):
def common_service_queue_user(
service: Union[str, int], op_code: str, tc_queue: TcQueueT
):
if service == CoreServiceList.SERVICE_2.value:
return pack_service_2_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_3.value:
@ -56,6 +60,8 @@ def pack_service_queue_user(service: Union[str, int], op_code: str, tc_queue: Tc
return pack_generic_service5_test_into(tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_8.value:
return pack_service_8_commands_into(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_11.value:
return pack_service_11_commands(op_code=op_code, tc_queue=tc_queue)
if service == CoreServiceList.SERVICE_17.value:
return pack_service_17_commands(op_code=op_code, tc_queue=tc_queue, init_ssc=0)
if service == CoreServiceList.SERVICE_20.value:

View File

@ -33,20 +33,19 @@ except ImportError as error:
from common_tmtc.config import __version__
from common_tmtc.config.definitions import PUS_APID
from common_tmtc.config.hook_implementation import FsfwHookBase
from common_tmtc.config.hook_implementation import CommonFsfwHookBase
from common_tmtc.pus_tm.factory_hook import ccsds_tm_handler
from common_tmtc.pus_tc.tc_packing import pre_tc_send_cb
def tmtcc_pre_args() -> FsfwHookBase:
def tmtcc_pre_args():
print(f"-- eive tmtc v{__version__} --")
print(f"-- spacepackets v{spacepackets.__version__} --")
tmtccmd.init_printout(False)
return FsfwHookBase(json_cfg_path=default_json_path())
def tmtcc_post_args(
hook_obj: FsfwHookBase, use_gui: bool, args: Optional[argparse.Namespace]
hook_obj: CommonFsfwHookBase, use_gui: bool, args: Optional[argparse.Namespace]
):
setup_args = SetupArgs(
hook_obj=hook_obj, use_gui=use_gui, apid=PUS_APID, cli_args=args