From 3b94a125ef14052779ae773025b51cd10117fc18 Mon Sep 17 00:00:00 2001 From: lkoester Date: Tue, 9 Apr 2024 11:18:54 +0200 Subject: [PATCH 1/4] added pus components and ping --- .gitignore | 1 + Cargo.lock | 115 ++++- Cargo.toml | 8 + ops-sat-rs-tmtc/.gitignore | 9 + ops-sat-rs-tmtc/common.py | 51 +++ ops-sat-rs-tmtc/main.py | 277 ++++++++++++ ops-sat-rs-tmtc/pus_tc.py | 143 ++++++ ops-sat-rs-tmtc/pus_tm.py | 0 ops-sat-rs-tmtc/requirements.txt | 2 + ops-sat-rs-tmtc/tc_definitions.py | 38 ++ ops-sat-rs-tmtc/tmtc_conf.json | 8 + src/config.rs | 53 +++ src/interface/mod.rs | 2 + src/{ => interface}/tcp.rs | 0 src/{ => interface}/udp.rs | 0 src/lib.rs | 31 ++ src/main.rs | 117 ++++- src/pus/mod.rs | 707 ++++++++++++++++++++++++++++++ src/pus/stack.rs | 75 ++++ src/pus/test.rs | 127 ++++++ src/requests.rs | 152 +++++++ src/tm_funnel.rs | 157 +++++++ src/tmtc.rs | 23 +- 23 files changed, 2081 insertions(+), 15 deletions(-) create mode 100644 ops-sat-rs-tmtc/.gitignore create mode 100644 ops-sat-rs-tmtc/common.py create mode 100644 ops-sat-rs-tmtc/main.py create mode 100644 ops-sat-rs-tmtc/pus_tc.py create mode 100644 ops-sat-rs-tmtc/pus_tm.py create mode 100644 ops-sat-rs-tmtc/requirements.txt create mode 100644 ops-sat-rs-tmtc/tc_definitions.py create mode 100644 ops-sat-rs-tmtc/tmtc_conf.json create mode 100644 src/interface/mod.rs rename src/{ => interface}/tcp.rs (100%) rename src/{ => interface}/udp.rs (100%) create mode 100644 src/pus/mod.rs create mode 100644 src/pus/stack.rs create mode 100644 src/pus/test.rs create mode 100644 src/requests.rs create mode 100644 src/tm_funnel.rs diff --git a/.gitignore b/.gitignore index 2fa16f8..afa321b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ /.vscode # Ignore this, may include user specific paths. /.cargo/config.toml +output.log \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index f36afba..4d5e3bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,6 +35,15 @@ dependencies = [ "libc", ] +[[package]] +name = "array-init" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23589ecb866b460d3a0f1278834750268c607e8e28a1b982c907219f3178cd72" +dependencies = [ + "nodrop", +] + [[package]] name = "autocfg" version = "1.2.0" @@ -138,6 +147,27 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "delegate" version = "0.10.0" @@ -149,6 +179,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive-new" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "downcast-rs" version = "1.2.1" @@ -231,6 +272,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + [[package]] name = "js-sys" version = "0.3.69" @@ -258,12 +305,24 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "nodrop" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" + [[package]] name = "num-traits" version = "0.2.18" @@ -315,11 +374,15 @@ name = "ops-sat-rs" version = "0.1.0" dependencies = [ "chrono", + "derive-new", "fern", "lazy_static", "log", + "num_enum", "satrs", + "satrs-mib", "strum", + "thiserror", ] [[package]] @@ -331,7 +394,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "smallvec", + "smallvec 1.13.2", "windows-targets 0.48.5", ] @@ -383,6 +446,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" +[[package]] +name = "ryu" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" + [[package]] name = "satrs" version = "0.2.0-rc.0" @@ -401,12 +470,34 @@ dependencies = [ "paste", "satrs-shared", "serde", - "smallvec", + "smallvec 1.13.2", "socket2", "spacepackets", "thiserror", ] +[[package]] +name = "satrs-mib" +version = "0.1.1" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +dependencies = [ + "csv", + "satrs-mib-codegen", + "satrs-shared", + "serde", + "serde-hex", +] + +[[package]] +name = "satrs-mib-codegen" +version = "0.1.1" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "satrs-shared" version = "0.1.3" @@ -425,6 +516,17 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-hex" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca37e3e4d1b39afd7ff11ee4e947efae85adfddf4841787bfa47c470e96dc26d" +dependencies = [ + "array-init", + "serde", + "smallvec 0.6.14", +] + [[package]] name = "serde_derive" version = "1.0.197" @@ -436,6 +538,15 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "smallvec" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" +dependencies = [ + "maybe-uninit", +] + [[package]] name = "smallvec" version = "1.13.2" diff --git a/Cargo.toml b/Cargo.toml index 40a8bbe..17f5bc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,16 @@ chrono = "0.4" log = "0.4" lazy_static = "1" strum = { version = "0.26", features = ["derive"] } +thiserror = "1" +derive-new = "0.6" +num_enum = "0.7" [dependencies.satrs] version = "0.2.0-rc.0" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" branch = "main" + +[dependencies.satrs-mib] +version = "0.1.1" +git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" +branch = "main" diff --git a/ops-sat-rs-tmtc/.gitignore b/ops-sat-rs-tmtc/.gitignore new file mode 100644 index 0000000..d994678 --- /dev/null +++ b/ops-sat-rs-tmtc/.gitignore @@ -0,0 +1,9 @@ +__pycache__ + +/venv +/log +/.idea/* +!/.idea/runConfigurations + +/seqcnt.txt +/.tmtc-history.txt diff --git a/ops-sat-rs-tmtc/common.py b/ops-sat-rs-tmtc/common.py new file mode 100644 index 0000000..6f56604 --- /dev/null +++ b/ops-sat-rs-tmtc/common.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import dataclasses +import enum +import struct + + +class Apid(enum.IntEnum): + SCHED = 1 + GENERIC_PUS = 2 + ACS = 3 + CFDP = 4 + + +class EventSeverity(enum.IntEnum): + INFO = 0 + LOW = 1 + MEDIUM = 2 + HIGH = 3 + + +@dataclasses.dataclass +class EventU32: + severity: EventSeverity + group_id: int + unique_id: int + + @classmethod + def unpack(cls, data: bytes) -> EventU32: + if len(data) < 4: + raise ValueError("passed data too short") + event_raw = struct.unpack("!I", data[0:4])[0] + return cls( + severity=EventSeverity((event_raw >> 30) & 0b11), + group_id=(event_raw >> 16) & 0x3FFF, + unique_id=event_raw & 0xFFFF, + ) + + +class AcsId(enum.IntEnum): + MGM_0 = 0 + + +class AcsHkIds(enum.IntEnum): + MGM_SET = 1 + + +def make_addressable_id(target_id: int, unique_id: int) -> bytes: + byte_string = bytearray(struct.pack("!I", target_id)) + byte_string.extend(struct.pack("!I", unique_id)) + return byte_string diff --git a/ops-sat-rs-tmtc/main.py b/ops-sat-rs-tmtc/main.py new file mode 100644 index 0000000..a3e0caf --- /dev/null +++ b/ops-sat-rs-tmtc/main.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +"""Example client for the sat-rs example application""" +import logging +import sys +import time +from typing import Any, Optional +from prompt_toolkit.history import History +from prompt_toolkit.history import FileHistory + +from spacepackets.ccsds import PacketId, PacketType +import tmtccmd +from spacepackets.ecss import PusTelemetry, PusVerificator +from spacepackets.ecss.pus_17_test import Service17Tm +from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm +from spacepackets.ccsds.time import CdsShortTimestamp + +from tmtccmd import TcHandlerBase, ProcedureParamsWrapper +from tmtccmd.core.base import BackendRequest +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tmtc import CcsdsTmHandler, GenericApidHandlerBase +from tmtccmd.com import ComInterface +from tmtccmd.config import ( + CmdTreeNode, + default_json_path, + SetupParams, + HookBase, + params_to_procedure_conversion, +) +from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper +from tmtccmd.logging import add_colorlog_console_logger +from tmtccmd.logging.pus import ( + RegularTmtcLogWrapper, + RawTmtcTimedLogWrapper, + TimedLogWhen, +) +from tmtccmd.tmtc import ( + TcQueueEntryType, + ProcedureWrapper, + TcProcedureType, + FeedWrapper, + SendCbParams, + DefaultPusQueueHelper, + QueueWrapper, +) +from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider +from tmtccmd.util.obj_id import ObjectIdDictT + + +import pus_tc +from common import Apid, EventU32 + +_LOGGER = logging.getLogger() + + +class SatRsConfigHook(HookBase): + def __init__(self, json_cfg_path: str): + super().__init__(json_cfg_path=json_cfg_path) + + def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + from tmtccmd.config.com import ( + create_com_interface_default, + create_com_interface_cfg_default, + ) + + assert self.cfg_path is not None + packet_id_list = [] + for apid in Apid: + packet_id_list.append(PacketId(PacketType.TM, True, apid)) + cfg = create_com_interface_cfg_default( + com_if_key=com_if_key, + json_cfg_path=self.cfg_path, + space_packet_ids=packet_id_list, + ) + assert cfg is not None + return create_com_interface_default(cfg) + + def get_command_definitions(self) -> CmdTreeNode: + """This function should return the root node of the command definition tree.""" + return pus_tc.create_cmd_definition_tree() + + def get_cmd_history(self) -> Optional[History]: + """Optionlly return a history class for the past command paths which will be used + when prompting a command path from the user in CLI mode.""" + return FileHistory(".tmtc-history.txt") + + def get_object_ids(self) -> ObjectIdDictT: + from tmtccmd.config.objects import get_core_object_ids + + return get_core_object_ids() + + +class PusHandler(GenericApidHandlerBase): + def __init__( + self, + file_logger: logging.Logger, + verif_wrapper: VerificationWrapper, + raw_logger: RawTmtcTimedLogWrapper, + ): + super().__init__(None) + self.file_logger = file_logger + self.raw_logger = raw_logger + self.verif_wrapper = verif_wrapper + + def handle_tm(self, apid: int, packet: bytes, _user_args: Any): + try: + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) + except ValueError as e: + _LOGGER.warning("Could not generate PUS TM object from raw data") + _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") + raise e + service = pus_tm.service + if service == 1: + tm_packet = Service1Tm.unpack( + data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) + ) + res = self.verif_wrapper.add_tm(tm_packet) + if res is None: + _LOGGER.info( + f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " + f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" + ) + _LOGGER.warning( + f"No matching telecommand found for {tm_packet.tc_req_id}" + ) + else: + self.verif_wrapper.log_to_console(tm_packet, res) + self.verif_wrapper.log_to_file(tm_packet, res) + elif service == 3: + _LOGGER.info("No handling for HK packets implemented") + _LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]") + pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty()) + if pus_tm.subservice == 25: + if len(pus_tm.source_data) < 8: + raise ValueError("No addressable ID in HK packet") + json_str = pus_tm.source_data[8:] + _LOGGER.info(json_str) + elif service == 5: + tm_packet = PusTelemetry.unpack( + packet, time_reader=CdsShortTimestamp.empty() + ) + src_data = tm_packet.source_data + event_u32 = EventU32.unpack(src_data) + _LOGGER.info(f"Received event packet. Event: {event_u32}") + if event_u32.group_id == 0 and event_u32.unique_id == 0: + _LOGGER.info("Received test event") + elif service == 17: + tm_packet = Service17Tm.unpack( + packet, time_reader=CdsShortTimestamp.empty() + ) + if tm_packet.subservice == 2: + self.file_logger.info("Received Ping Reply TM[17,2]") + _LOGGER.info("Received Ping Reply TM[17,2]") + else: + self.file_logger.info( + f"Received Test Packet with unknown subservice {tm_packet.subservice}" + ) + _LOGGER.info( + f"Received Test Packet with unknown subservice {tm_packet.subservice}" + ) + else: + _LOGGER.info( + f"The service {service} is not implemented in Telemetry Factory" + ) + tm_packet = PusTelemetry.unpack( + packet, time_reader=CdsShortTimestamp.empty() + ) + self.raw_logger.log_tm(pus_tm) + + +class TcHandler(TcHandlerBase): + def __init__( + self, + seq_count_provider: FileSeqCountProvider, + verif_wrapper: VerificationWrapper, + ): + super(TcHandler, self).__init__() + self.seq_count_provider = seq_count_provider + self.verif_wrapper = verif_wrapper + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=QueueWrapper.empty(), + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, + seq_cnt_provider=seq_count_provider, + pus_verificator=self.verif_wrapper.pus_verificator, + default_pus_apid=None, + ) + + def send_cb(self, send_params: SendCbParams): + entry_helper = send_params.entry + if entry_helper.is_tc: + if entry_helper.entry_type == TcQueueEntryType.PUS_TC: + pus_tc_wrapper = entry_helper.to_pus_tc_entry() + raw_tc = pus_tc_wrapper.pus_tc.pack() + _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") + send_params.com_if.send(raw_tc) + elif entry_helper.entry_type == TcQueueEntryType.LOG: + log_entry = entry_helper.to_log_entry() + _LOGGER.info(log_entry.log_str) + + def queue_finished_cb(self, info: ProcedureWrapper): + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") + + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): + q = self.queue_helper + q.queue_wrapper = wrapper.queue_wrapper + if info.proc_type == TcProcedureType.DEFAULT: + def_proc = info.to_def_procedure() + assert def_proc.cmd_path is not None + pus_tc.pack_pus_telecommands(q, def_proc.cmd_path) + + +def main(): + add_colorlog_console_logger(_LOGGER) + tmtccmd.init_printout(False) + hook_obj = SatRsConfigHook(json_cfg_path=default_json_path()) + parser_wrapper = PreArgsParsingWrapper() + parser_wrapper.create_default_parent_parser() + parser_wrapper.create_default_parser() + parser_wrapper.add_def_proc_args() + params = SetupParams() + post_args_wrapper = parser_wrapper.parse(hook_obj, params) + proc_wrapper = ProcedureParamsWrapper() + if post_args_wrapper.use_gui: + post_args_wrapper.set_params_without_prompts(proc_wrapper) + else: + post_args_wrapper.set_params_with_prompts(proc_wrapper) + setup_args = SetupWrapper( + hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper + ) + # Create console logger helper and file loggers + tmtc_logger = RegularTmtcLogWrapper() + file_logger = tmtc_logger.logger + raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1) + verificator = PusVerificator() + verification_wrapper = VerificationWrapper(verificator, _LOGGER, file_logger) + # Create primary TM handler and add it to the CCSDS Packet Handler + tm_handler = PusHandler(file_logger, verification_wrapper, raw_logger) + ccsds_handler = CcsdsTmHandler(generic_handler=tm_handler) + # TODO: We could add the CFDP handler for the CFDP APID at a later stage. + # ccsds_handler.add_apid_handler(tm_handler) + + # Create TC handler + seq_count_provider = PusFileSeqCountProvider() + tc_handler = TcHandler(seq_count_provider, verification_wrapper) + tmtccmd.setup(setup_args=setup_args) + init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper) + tmtc_backend = tmtccmd.create_default_tmtc_backend( + setup_wrapper=setup_args, + tm_handler=ccsds_handler, + tc_handler=tc_handler, + init_procedure=init_proc, + ) + tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj) + try: + while True: + state = tmtc_backend.periodic_op(None) + if state.request == BackendRequest.TERMINATION_NO_ERROR: + sys.exit(0) + elif state.request == BackendRequest.DELAY_IDLE: + _LOGGER.info("TMTC Client in IDLE mode") + time.sleep(3.0) + elif state.request == BackendRequest.DELAY_LISTENER: + time.sleep(0.8) + elif state.request == BackendRequest.DELAY_CUSTOM: + if state.next_delay.total_seconds() <= 0.4: + time.sleep(state.next_delay.total_seconds()) + else: + time.sleep(0.4) + elif state.request == BackendRequest.CALL_NEXT: + pass + except KeyboardInterrupt: + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/ops-sat-rs-tmtc/pus_tc.py b/ops-sat-rs-tmtc/pus_tc.py new file mode 100644 index 0000000..b0febdc --- /dev/null +++ b/ops-sat-rs-tmtc/pus_tc.py @@ -0,0 +1,143 @@ +import datetime +import struct +import logging + +from spacepackets.ccsds import CdsShortTimestamp +from spacepackets.ecss import PusTelecommand +from tmtccmd.config import CmdTreeNode +from tmtccmd.pus.tc.s200_fsfw_mode import Mode +from tmtccmd.tmtc import DefaultPusQueueHelper +from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd +from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice + +from common import AcsId, Apid + +_LOGGER = logging.getLogger(__name__) + + +def create_set_mode_cmd( + apid: int, unique_id: int, mode: int, submode: int +) -> PusTelecommand: + app_data = bytearray() + app_data.extend(struct.pack("!I", unique_id)) + app_data.extend(struct.pack("!I", mode)) + app_data.extend(struct.pack("!H", submode)) + return PusTelecommand( + service=200, + subservice=ModeSubservice.TC_MODE_COMMAND, + apid=apid, + app_data=app_data, + ) + + +def create_cmd_definition_tree() -> CmdTreeNode: + + root_node = CmdTreeNode.root_node() + + hk_node = CmdTreeNode("hk", "Housekeeping Node", hide_children_for_print=True) + hk_node.add_child(CmdTreeNode("one_shot_hk", "Request One Shot HK set")) + hk_node.add_child( + CmdTreeNode("enable", "Enable periodic housekeeping data generation") + ) + hk_node.add_child( + CmdTreeNode("disable", "Disable periodic housekeeping data generation") + ) + + mode_node = CmdTreeNode("mode", "Mode Node", hide_children_for_print=True) + set_mode_node = CmdTreeNode( + "set_mode", "Set Node", hide_children_which_are_leaves=True + ) + set_mode_node.add_child(CmdTreeNode("off", "Set OFF Mode")) + set_mode_node.add_child(CmdTreeNode("on", "Set ON Mode")) + set_mode_node.add_child(CmdTreeNode("normal", "Set NORMAL Mode")) + mode_node.add_child(set_mode_node) + mode_node.add_child(CmdTreeNode("read_mode", "Read Mode")) + + test_node = CmdTreeNode("test", "Test Node") + test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC")) + test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event")) + root_node.add_child(test_node) + + scheduler_node = CmdTreeNode("scheduler", "Scheduler Node") + scheduler_node.add_child( + CmdTreeNode( + "schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds" + ) + ) + root_node.add_child(scheduler_node) + + acs_node = CmdTreeNode("acs", "ACS Subsystem Node") + mgm_node = CmdTreeNode("mgms", "MGM devices node") + mgm_node.add_child(mode_node) + mgm_node.add_child(hk_node) + + acs_node.add_child(mgm_node) + root_node.add_child(acs_node) + + return root_node + + +def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): + # It should always be at least the root path "/", so we split of the empty portion left of it. + cmd_path_list = cmd_path.split("/")[1:] + if len(cmd_path_list) == 0: + _LOGGER.warning("empty command path") + return + if cmd_path_list[0] == "test": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "ping": + q.add_log_cmd("Sending PUS ping telecommand") + return q.add_pus_tc( + PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1) + ) + elif cmd_path_list[1] == "trigger_event": + q.add_log_cmd("Triggering test event") + return q.add_pus_tc( + PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128) + ) + if cmd_path_list[0] == "scheduler": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "schedule_ping_10_secs_ahead": + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd( + time_stamp, + PusTelecommand(service=17, subservice=1), + apid=Apid.SCHED, + ) + ) + if cmd_path_list[0] == "acs": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "mgms": + assert len(cmd_path_list) >= 3 + if cmd_path_list[2] == "hk": + if cmd_path_list[3] == "one_shot_hk": + q.add_log_cmd("Sending HK one shot request") + # TODO: Fix + # q.add_pus_tc( + # create_request_one_hk_command( + # make_addressable_id(Apid.ACS, AcsId.MGM_SET) + # ) + # ) + if cmd_path_list[2] == "mode": + if cmd_path_list[3] == "set_mode": + handle_set_mode_cmd( + q, "MGM 0", cmd_path_list[4], Apid.ACS, AcsId.MGM_0 + ) + + +def handle_set_mode_cmd( + q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int +): + if mode_str == "off": + q.add_log_cmd(f"Sending Mode OFF to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0)) + elif mode_str == "on": + q.add_log_cmd(f"Sending Mode ON to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0)) + elif mode_str == "normal": + q.add_log_cmd(f"Sending Mode NORMAL to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0)) diff --git a/ops-sat-rs-tmtc/pus_tm.py b/ops-sat-rs-tmtc/pus_tm.py new file mode 100644 index 0000000..e69de29 diff --git a/ops-sat-rs-tmtc/requirements.txt b/ops-sat-rs-tmtc/requirements.txt new file mode 100644 index 0000000..b3f6f2a --- /dev/null +++ b/ops-sat-rs-tmtc/requirements.txt @@ -0,0 +1,2 @@ +tmtccmd == 8.0.0rc1 +# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd diff --git a/ops-sat-rs-tmtc/tc_definitions.py b/ops-sat-rs-tmtc/tc_definitions.py new file mode 100644 index 0000000..74fbff8 --- /dev/null +++ b/ops-sat-rs-tmtc/tc_definitions.py @@ -0,0 +1,38 @@ +from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper, CoreServiceList +from tmtccmd.config.globals import get_default_tmtc_defs + +from common import HkOpCodes + + +def tc_definitions() -> TmtcDefinitionWrapper: + defs = get_default_tmtc_defs() + srv_5 = OpCodeEntry() + srv_5.add("0", "Event Test") + defs.add_service( + name=CoreServiceList.SERVICE_5.value, + info="PUS Service 5 Event", + op_code_entry=srv_5, + ) + srv_17 = OpCodeEntry() + srv_17.add("ping", "Ping Test") + srv_17.add("trigger_event", "Trigger Event") + defs.add_service( + name=CoreServiceList.SERVICE_17_ALT, + info="PUS Service 17 Test", + op_code_entry=srv_17, + ) + srv_3 = OpCodeEntry() + srv_3.add(HkOpCodes.GENERATE_ONE_SHOT, "Generate AOCS one shot HK") + defs.add_service( + name=CoreServiceList.SERVICE_3, + info="PUS Service 3 Housekeeping", + op_code_entry=srv_3, + ) + srv_11 = OpCodeEntry() + srv_11.add("0", "Scheduled TC Test") + defs.add_service( + name=CoreServiceList.SERVICE_11, + info="PUS Service 11 TC Scheduling", + op_code_entry=srv_11, + ) + return defs diff --git a/ops-sat-rs-tmtc/tmtc_conf.json b/ops-sat-rs-tmtc/tmtc_conf.json new file mode 100644 index 0000000..f2c8afd --- /dev/null +++ b/ops-sat-rs-tmtc/tmtc_conf.json @@ -0,0 +1,8 @@ +{ + "com_if": "tcp", + "tcpip_udp_ip_addr": "127.0.0.1", + "tcpip_udp_port": 7301, + "tcpip_udp_recv_max_size": 1500, + "tcpip_tcp_ip_addr": "127.0.0.1", + "tcpip_tcp_port": 7301 +} diff --git a/src/config.rs b/src/config.rs index 72a6ece..53a54df 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,9 @@ use lazy_static::lazy_static; use satrs::spacepackets::{PacketId, PacketType}; use std::{collections::HashSet, net::Ipv4Addr}; use strum::IntoEnumIterator; +use satrs_mib::resultcode; +use satrs_mib::res_code::ResultU16Info; +use num_enum::{IntoPrimitive, TryFromPrimitive}; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; @@ -23,6 +26,56 @@ lazy_static! { }; } +#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] +#[repr(u8)] +pub enum CustomPusServiceId { + Mode = 200, + Health = 201, +} + +#[derive(Debug)] +pub enum GroupId { + Tmtc = 0, + Hk = 1, + Mode = 2, +} + +pub mod tmtc_err { + use satrs::res_code::ResultU16; + use super::*; + + #[resultcode] + pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 0); + #[resultcode] + pub const INVALID_PUS_SUBSERVICE: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 1); + #[resultcode] + pub const PUS_SERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 2); + #[resultcode] + pub const PUS_SUBSERVICE_NOT_IMPLEMENTED: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 3); + #[resultcode] + pub const UNKNOWN_TARGET_ID: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 4); + #[resultcode] + pub const ROUTING_ERROR: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 5); + #[resultcode(info = "Request timeout for targeted PUS request. P1: Request ID. P2: Target ID")] + pub const REQUEST_TIMEOUT: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 6); + + #[resultcode( + info = "Not enough data inside the TC application data field. Optionally includes: \ + 8 bytes of failure data containing 2 failure parameters, \ + P1 (u32 big endian): Expected data length, P2: Found data length" + )] + pub const NOT_ENOUGH_APP_DATA: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 2); + + pub const TMTC_RESULTS: &[ResultU16Info] = &[ + INVALID_PUS_SERVICE_EXT, + INVALID_PUS_SUBSERVICE_EXT, + PUS_SERVICE_NOT_IMPLEMENTED_EXT, + UNKNOWN_TARGET_ID_EXT, + ROUTING_ERROR_EXT, + NOT_ENOUGH_APP_DATA_EXT, + ]; +} + pub mod components { use satrs::request::UniqueApidTargetId; use strum::EnumIter; diff --git a/src/interface/mod.rs b/src/interface/mod.rs new file mode 100644 index 0000000..f7a6a76 --- /dev/null +++ b/src/interface/mod.rs @@ -0,0 +1,2 @@ +pub mod tcp; +pub mod udp; diff --git a/src/tcp.rs b/src/interface/tcp.rs similarity index 100% rename from src/tcp.rs rename to src/interface/tcp.rs diff --git a/src/udp.rs b/src/interface/udp.rs similarity index 100% rename from src/udp.rs rename to src/interface/udp.rs diff --git a/src/lib.rs b/src/lib.rs index b957951..ef68b47 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,34 @@ use std::net::Ipv4Addr; +use satrs::spacepackets::time::cds::CdsTime; +use satrs::spacepackets::time::TimeWriter; pub mod config; + +pub struct TimeStampHelper { + stamper: CdsTime, + time_stamp: [u8; 7], +} + +impl TimeStampHelper { + pub fn stamp(&self) -> &[u8] { + &self.time_stamp + } + + pub fn update_from_now(&mut self) { + self.stamper + .update_from_now() + .expect("Updating timestamp failed"); + self.stamper + .write_to_bytes(&mut self.time_stamp) + .expect("Writing timestamp failed"); + } +} + +impl Default for TimeStampHelper { + fn default() -> Self { + Self { + stamper: CdsTime::now_with_u16_days().expect("creating time stamper failed"), + time_stamp: Default::default(), + } + } +} diff --git a/src/main.rs b/src/main.rs index c53fbf4..60a36d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,37 +13,113 @@ use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, tmtc::CcsdsDistributor, }; +use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; use crate::{ ccsds::CcsdsReceiver, logger::setup_logger, - tcp::{SyncTcpTmSource, TcpTask}, + interface::tcp::{SyncTcpTmSource, TcpTask}, tmtc::PusTcSourceProviderDynamic, - udp::{DynamicUdpTmHandler, UdpTmtcServer}, + interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, }; +use crate::pus::{PusReceiver, PusTcMpscRouter}; +use crate::pus::stack::PusStack; +use crate::pus::test::create_test_service_dynamic; +use crate::requests::GenericRequestRouter; +use crate::tm_funnel::TmFunnelDynamic; +use crate::tmtc::TcSourceTaskDynamic; mod ccsds; mod logger; -mod tcp; mod tmtc; -mod udp; +mod requests; +mod pus; +mod tm_funnel; +mod interface; +#[allow(dead_code)] fn main() { setup_logger().expect("setting up logging with fern failed"); println!("OPS-SAT Rust experiment OBSW"); let (tc_source_tx, tc_source_rx) = mpsc::channel(); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); let (tm_server_tx, tm_server_rx) = mpsc::channel(); let tc_source = PusTcSourceProviderDynamic(tc_source_tx); + let (pus_test_tx, pus_test_rx) = mpsc::channel(); + // let (pus_event_tx, pus_event_rx) = mpsc::channel(); + // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); + // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); + // let (pus_action_tx, pus_action_rx) = mpsc::channel(); + // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); + + // let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); + // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); + + let pus_router = PusTcMpscRouter { + test_tc_sender: pus_test_tx, + // event_tc_sender: pus_event_tx, + // sched_tc_sender: pus_sched_tx, + // hk_tc_sender: pus_hk_tx, + // action_tc_sender: pus_action_tx, + // mode_tc_sender: pus_mode_tx, + }; + + let pus_test_service = create_test_service_dynamic( + tm_funnel_tx.clone(), + // event_handler.clone_event_sender(), + pus_test_rx, + ); + // let pus_scheduler_service = create_scheduler_service_dynamic( + // tm_funnel_tx.clone(), + // tc_source.0.clone(), + // pus_sched_rx, + // create_sched_tc_pool(), + // ); + // + // let pus_event_service = + // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); + // let pus_action_service = create_action_service_dynamic( + // tm_funnel_tx.clone(), + // pus_action_rx, + // request_map.clone(), + // pus_action_reply_rx, + // ); + // let pus_hk_service = create_hk_service_dynamic( + // tm_funnel_tx.clone(), + // pus_hk_rx, + // request_map.clone(), + // pus_hk_reply_rx, + // ); + // let pus_mode_service = create_mode_service_dynamic( + // tm_funnel_tx.clone(), + // pus_mode_rx, + // request_map, + // pus_mode_reply_rx, + // ); + let mut pus_stack = PusStack::new( + pus_test_service, + // pus_hk_service, + // pus_event_service, + // pus_action_service, + // pus_scheduler_service, + // pus_mode_service, + ); + let ccsds_receiver = CcsdsReceiver { tc_source }; + let mut tmtc_task = TcSourceTaskDynamic::new( + tc_source_rx, + PusReceiver::new(tm_funnel_tx.clone(), pus_router), + ); + let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) .expect("creating UDP TMTC server failed"); - let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_handler: DynamicUdpTmHandler { @@ -60,7 +136,9 @@ fn main() { tcp_ccsds_distributor, PACKET_ID_VALIDATOR.clone(), ) - .expect("tcp server creation failed"); + .expect("tcp server creation failed"); + + let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); info!("Starting TMTC and UDP task"); let jh_udp_tmtc = thread::Builder::new() @@ -69,7 +147,7 @@ fn main() { info!("Running UDP server on port {SERVER_PORT}"); loop { udp_tmtc_server.periodic_operation(); - // tmtc_task.periodic_operation(); + tmtc_task.periodic_operation(); thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); } }) @@ -86,10 +164,33 @@ fn main() { }) .unwrap(); + info!("Starting TM funnel task"); + let jh_tm_funnel = thread::Builder::new() + .name("TM Funnel".to_string()) + .spawn(move || loop { + tm_funnel.operation(); + }) + .unwrap(); + + info!("Starting PUS handler thread"); + let jh_pus_handler = thread::Builder::new() + .name("PUS".to_string()) + .spawn(move || loop { + pus_stack.periodic_operation(); + thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); + }) + .unwrap(); + jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); jh_tcp .join() .expect("Joining TCP TMTC server thread failed"); -} + jh_tm_funnel + .join() + .expect("Joining TM Funnel thread failed"); + jh_pus_handler + .join() + .expect("Joining PUS handler thread failed"); +} \ No newline at end of file diff --git a/src/pus/mod.rs b/src/pus/mod.rs new file mode 100644 index 0000000..d32629d --- /dev/null +++ b/src/pus/mod.rs @@ -0,0 +1,707 @@ +pub mod test; +pub mod stack; + +use crate::requests::GenericRequestRouter; +use log::warn; +use satrs::pus::verification::{ + self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, + VerificationReporterCfg, VerificationReportingProvider, VerificationToken, +}; +use satrs::pus::{ + ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, + EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericConversionError, + GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, + PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, +}; +use satrs::queue::GenericReceiveError; +use satrs::request::{Apid, GenericMessage, MessageMetadata}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::PusServiceId; +use satrs::ComponentId; +use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; +use ops_sat_rs::TimeStampHelper; +use std::fmt::Debug; +use std::sync::mpsc::{self, Sender}; +use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; +use crate::tmtc::MpscStoreAndSendError; + +// pub mod action; +// pub mod event; +// pub mod hk; +// pub mod mode; +// pub mod scheduler; +// pub mod stack; +// pub mod test; + +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum HandlingStatus { + Empty, + HandledOne, +} + +pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> VerificationReporter { + let verif_cfg = VerificationReporterCfg::new(apid, 1, 2, 8).unwrap(); + // Every software component which needs to generate verification telemetry, gets a cloned + // verification reporter. + VerificationReporter::new(owner_id, &verif_cfg) +} + +/// Simple router structure which forwards PUS telecommands to dedicated handlers. +pub struct PusTcMpscRouter { + pub test_tc_sender: Sender, + // pub event_tc_sender: Sender, + // pub sched_tc_sender: Sender, + // pub hk_tc_sender: Sender, + // pub action_tc_sender: Sender, + // pub mode_tc_sender: Sender, +} + +pub struct PusReceiver { + pub id: ComponentId, + pub tm_sender: TmSender, + pub verif_reporter: VerificationReporter, + pub pus_router: PusTcMpscRouter, + stamp_helper: TimeStampHelper, +} + +impl PusReceiver { + pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { + Self { + id: PUS_ROUTING_SERVICE.raw(), + tm_sender, + verif_reporter: create_verification_reporter( + PUS_ROUTING_SERVICE.id(), + PUS_ROUTING_SERVICE.apid, + ), + pus_router, + stamp_helper: TimeStampHelper::default(), + } + } + + pub fn handle_tc_packet( + &mut self, + tc_in_memory: TcInMemory, + service: u8, + pus_tc: &PusTcReader, + ) -> Result { + let init_token = self.verif_reporter.add_tc(pus_tc); + self.stamp_helper.update_from_now(); + let accepted_token = self + .verif_reporter + .acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp()) + .expect("Acceptance success failure"); + let service = PusServiceId::try_from(service); + match service { + Ok(standard_service) => match standard_service { + PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })?, + // PusServiceId::Housekeeping => { + // self.pus_router.hk_tc_sender.send(EcssTcAndToken { + // tc_in_memory, + // token: Some(accepted_token.into()), + // })? + // } + // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { + // tc_in_memory, + // token: Some(accepted_token.into()), + // })?, + // PusServiceId::Scheduling => { + // self.pus_router.sched_tc_sender.send(EcssTcAndToken { + // tc_in_memory, + // token: Some(accepted_token.into()), + // })? + // } + _ => { + let result = self.verif_reporter.start_failure( + &self.tm_sender, + accepted_token, + FailParams::new( + self.stamp_helper.stamp(), + &tmtc_err::PUS_SERVICE_NOT_IMPLEMENTED, + &[standard_service as u8], + ), + ); + if result.is_err() { + warn!("Sending verification failure failed"); + } + } + }, + Err(e) => { + if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) { + match custom_service { + CustomPusServiceId::Mode => { + // self.pus_router.mode_tc_sender.send(EcssTcAndToken { + // tc_in_memory, + // token: Some(accepted_token.into()), + // })? + } + CustomPusServiceId::Health => {} + } + } else { + self.verif_reporter + .start_failure( + &self.tm_sender, + accepted_token, + FailParams::new( + self.stamp_helper.stamp(), + &tmtc_err::INVALID_PUS_SUBSERVICE, + &[e.number], + ), + ) + .expect("Start failure verification failed") + } + } + } + Ok(PusPacketHandlerResult::RequestHandled) + } +} + +pub trait TargetedPusService { + /// Returns [true] interface the packet handling is finished. + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool; + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus; + fn check_for_request_timeouts(&mut self); +} + +/// This is a generic handler class for all PUS services where a PUS telecommand is converted +/// to a targeted request. +/// +/// The generic steps for this process are the following +/// +/// 1. Poll for TC packets +/// 2. Convert the raw packets to a [PusTcReader]. +/// 3. Convert the PUS TC to a typed request using the [PusTcToRequestConverter]. +/// 4. Route the requests using the [GenericRequestRouter]. +/// 5. Add the request to the active request map using the [ActiveRequestMapProvider] abstraction. +/// 6. Check for replies which complete the forwarded request. The handler takes care of +/// the verification process. +/// 7. Check for timeouts of active requests. Generally, the timeout on the service level should +/// be highest expected timeout for the given target. +/// +/// The handler exposes the following API: +/// +/// 1. [Self::handle_one_tc] which tries to poll and handle one TC packet, covering steps 1-5. +/// 2. [Self::check_one_reply] which tries to poll and handle one reply, covering step 6. +/// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. +pub struct PusTargetedRequestService< + TcReceiver: EcssTcReceiverCore, + TmSender: EcssTmSenderCore, + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + RequestConverter: PusTcToRequestConverter, + ReplyHandler: PusReplyHandler, + ActiveRequestMap: ActiveRequestMapProvider, + ActiveRequestInfo: ActiveRequestProvider, + RequestType, + ReplyType, +> { + pub service_helper: + PusServiceHelper, + pub request_router: GenericRequestRouter, + pub request_converter: RequestConverter, + pub active_request_map: ActiveRequestMap, + pub reply_handler: ReplyHandler, + pub reply_receiver: mpsc::Receiver>, + phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, +} + +impl< + TcReceiver: EcssTcReceiverCore, + TmSender: EcssTmSenderCore, + TcInMemConverter: EcssTcInMemConverter, + VerificationReporter: VerificationReportingProvider, + RequestConverter: PusTcToRequestConverter, + ReplyHandler: PusReplyHandler, + ActiveRequestMap: ActiveRequestMapProvider, + ActiveRequestInfo: ActiveRequestProvider, + RequestType, + ReplyType, + > + PusTargetedRequestService< + TcReceiver, + TmSender, + TcInMemConverter, + VerificationReporter, + RequestConverter, + ReplyHandler, + ActiveRequestMap, + ActiveRequestInfo, + RequestType, + ReplyType, + > +where + GenericRequestRouter: PusRequestRouter, +{ + pub fn new( + service_helper: PusServiceHelper< + TcReceiver, + TmSender, + TcInMemConverter, + VerificationReporter, + >, + request_converter: RequestConverter, + active_request_map: ActiveRequestMap, + reply_hook: ReplyHandler, + request_router: GenericRequestRouter, + reply_receiver: mpsc::Receiver>, + ) -> Self { + Self { + service_helper, + request_converter, + active_request_map, + reply_handler: reply_hook, + request_router, + reply_receiver, + phantom: std::marker::PhantomData, + } + } + + pub fn poll_and_handle_next_tc( + &mut self, + time_stamp: &[u8], + ) -> Result { + let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?; + if possible_packet.is_none() { + return Ok(PusPacketHandlerResult::Empty); + } + let ecss_tc_and_token = possible_packet.unwrap(); + self.service_helper + .tc_in_mem_converter_mut() + .cache(&ecss_tc_and_token.tc_in_memory)?; + let tc = self.service_helper.tc_in_mem_converter().convert()?; + let (mut request_info, request) = match self.request_converter.convert( + ecss_tc_and_token.token, + &tc, + self.service_helper.tm_sender(), + &self.service_helper.common.verif_reporter, + time_stamp, + ) { + Ok((info, req)) => (info, req), + Err(e) => { + self.handle_conversion_to_request_error(&e, ecss_tc_and_token.token, time_stamp); + return Err(e.into()); + } + }; + let accepted_token: VerificationToken = request_info + .token() + .try_into() + .expect("token not in expected accepted state"); + let verif_request_id = verification::RequestId::new(&tc).raw(); + match self.request_router.route( + MessageMetadata::new(verif_request_id, self.service_helper.id()), + request_info.target_id(), + request, + ) { + Ok(()) => { + let started_token = self + .service_helper + .verif_reporter() + .start_success( + &self.service_helper.common.tm_sender, + accepted_token, + time_stamp, + ) + .expect("Start success failure"); + request_info.set_token(started_token.into()); + self.active_request_map + .insert(&verif_request_id, request_info); + } + Err(e) => { + self.request_router.handle_error_generic( + &request_info, + &tc, + e.clone(), + self.service_helper.tm_sender(), + self.service_helper.verif_reporter(), + time_stamp, + ); + return Err(e.into()); + } + } + Ok(PusPacketHandlerResult::RequestHandled) + } + + fn handle_conversion_to_request_error( + &mut self, + error: &GenericConversionError, + token: VerificationToken, + time_stamp: &[u8], + ) { + match error { + GenericConversionError::WrongService(service) => { + let service_slice: [u8; 1] = [*service]; + self.service_helper + .verif_reporter() + .completion_failure( + self.service_helper.tm_sender(), + token, + FailParams::new(time_stamp, &tmtc_err::INVALID_PUS_SERVICE, &service_slice), + ) + .expect("Sending completion failure failed"); + } + GenericConversionError::InvalidSubservice(subservice) => { + let subservice_slice: [u8; 1] = [*subservice]; + self.service_helper + .verif_reporter() + .completion_failure( + self.service_helper.tm_sender(), + token, + FailParams::new( + time_stamp, + &tmtc_err::INVALID_PUS_SUBSERVICE, + &subservice_slice, + ), + ) + .expect("Sending completion failure failed"); + } + GenericConversionError::NotEnoughAppData { expected, found } => { + let mut context_info = (*found as u32).to_be_bytes().to_vec(); + context_info.extend_from_slice(&(*expected as u32).to_be_bytes()); + self.service_helper + .verif_reporter() + .completion_failure( + self.service_helper.tm_sender(), + token, + FailParams::new(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA, &context_info), + ) + .expect("Sending completion failure failed"); + } + // Do nothing.. this is service-level and can not be handled generically here. + GenericConversionError::InvalidAppData(_) => (), + } + } + + pub fn poll_and_check_next_reply( + &mut self, + time_stamp: &[u8], + ) -> Result { + match self.reply_receiver.try_recv() { + Ok(reply) => { + self.handle_reply(&reply, time_stamp)?; + Ok(HandlingStatus::HandledOne) + } + Err(e) => match e { + mpsc::TryRecvError::Empty => Ok(HandlingStatus::Empty), + mpsc::TryRecvError::Disconnected => Err(EcssTmtcError::Receive( + GenericReceiveError::TxDisconnected(None), + )), + }, + } + } + + pub fn handle_reply( + &mut self, + reply: &GenericMessage, + time_stamp: &[u8], + ) -> Result<(), EcssTmtcError> { + let active_req_opt = self.active_request_map.get(reply.request_id()); + if active_req_opt.is_none() { + self.reply_handler + .handle_unrequested_reply(reply, &self.service_helper.common.tm_sender)?; + return Ok(()); + } + let active_request = active_req_opt.unwrap(); + let request_finished = self + .reply_handler + .handle_reply( + reply, + active_request, + &self.service_helper.common.tm_sender, + &self.service_helper.common.verif_reporter, + time_stamp, + ) + .unwrap_or(false); + if request_finished { + self.active_request_map.remove(reply.request_id()); + } + Ok(()) + } + + pub fn check_for_request_timeouts(&mut self) { + let mut requests_to_delete = Vec::new(); + self.active_request_map + .for_each(|request_id, request_info| { + if request_info.has_timed_out() { + requests_to_delete.push(*request_id); + } + }); + if !requests_to_delete.is_empty() { + for request_id in requests_to_delete { + self.active_request_map.remove(request_id); + } + } + } +} + +/// Generic timeout handling: Handle the verification failure with a dedicated return code +/// and also log the error. +pub fn generic_pus_request_timeout_handler( + sender: &(impl EcssTmSenderCore + ?Sized), + active_request: &(impl ActiveRequestProvider + Debug), + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + service_str: &'static str, +) -> Result<(), EcssTmtcError> { + log::warn!("timeout for active request {active_request:?} on {service_str} service"); + let started_token: VerificationToken = active_request + .token() + .try_into() + .expect("token not in expected started state"); + verification_handler.completion_failure( + sender, + started_token, + FailParams::new(time_stamp, &tmtc_err::REQUEST_TIMEOUT, &[]), + )?; + Ok(()) +} + +#[cfg(test)] +pub(crate) mod tests { + use std::time::Duration; + + use satrs::pus::test_util::TEST_COMPONENT_ID_0; + use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant}; + use satrs::request::RequestId; + use satrs::{ + pus::{ + verification::test_util::TestVerificationReporter, ActivePusRequestStd, + ActiveRequestMapProvider, EcssTcInVecConverter, MpscTcReceiver, + }, + request::UniqueApidTargetId, + spacepackets::{ + ecss::{ + tc::{PusTcCreator, PusTcSecondaryHeader}, + WritablePusPacket, + }, + SpHeader, + }, + }; + + use crate::requests::CompositeRequest; + + use super::*; + + // Testbench dedicated to the testing of [PusReplyHandler]s + pub struct ReplyHandlerTestbench< + ReplyHandler: PusReplyHandler, + ActiveRequestInfo: ActiveRequestProvider, + Reply, + > { + pub id: ComponentId, + pub verif_reporter: TestVerificationReporter, + pub reply_handler: ReplyHandler, + pub tm_receiver: mpsc::Receiver, + pub default_timeout: Duration, + tm_sender: MpscTmAsVecSender, + phantom: std::marker::PhantomData<(ActiveRequestInfo, Reply)>, + } + + impl< + ReplyHandler: PusReplyHandler, + ActiveRequestInfo: ActiveRequestProvider, + Reply, + > ReplyHandlerTestbench + { + pub fn new(owner_id: ComponentId, reply_handler: ReplyHandler) -> Self { + let test_verif_reporter = TestVerificationReporter::new(owner_id); + let (tm_sender, tm_receiver) = mpsc::channel(); + Self { + id: TEST_COMPONENT_ID_0.raw(), + verif_reporter: test_verif_reporter, + reply_handler, + default_timeout: Duration::from_secs(30), + tm_sender, + tm_receiver, + phantom: std::marker::PhantomData, + } + } + + pub fn add_tc( + &mut self, + apid: u16, + apid_target: u32, + time_stamp: &[u8], + ) -> (verification::RequestId, ActivePusRequestStd) { + let sp_header = SpHeader::new_from_apid(apid); + let sec_header_dummy = PusTcSecondaryHeader::new_simple(0, 0); + let init = self.verif_reporter.add_tc(&PusTcCreator::new( + sp_header, + sec_header_dummy, + &[], + true, + )); + let accepted = self + .verif_reporter + .acceptance_success(&self.tm_sender, init, time_stamp) + .expect("acceptance failed"); + let started = self + .verif_reporter + .start_success(&self.tm_sender, accepted, time_stamp) + .expect("start failed"); + ( + started.request_id(), + ActivePusRequestStd::new( + UniqueApidTargetId::new(apid, apid_target).raw(), + started, + self.default_timeout, + ), + ) + } + + pub fn handle_reply( + &mut self, + reply: &GenericMessage, + active_request: &ActiveRequestInfo, + time_stamp: &[u8], + ) -> Result { + self.reply_handler.handle_reply( + reply, + active_request, + &self.tm_sender, + &self.verif_reporter, + time_stamp, + ) + } + + pub fn handle_unrequested_reply( + &mut self, + reply: &GenericMessage, + ) -> Result<(), ReplyHandler::Error> { + self.reply_handler + .handle_unrequested_reply(reply, &self.tm_sender) + } + pub fn handle_request_timeout( + &mut self, + active_request_info: &ActiveRequestInfo, + time_stamp: &[u8], + ) -> Result<(), ReplyHandler::Error> { + self.reply_handler.handle_request_timeout( + active_request_info, + &self.tm_sender, + &self.verif_reporter, + time_stamp, + ) + } + } + + #[derive(Default)] + pub struct DummySender {} + + /// Dummy sender component which does nothing on the [Self::send_tm] call. + /// + /// Useful for unit tests. + impl EcssTmSenderCore for DummySender { + fn send_tm(&self, _source_id: ComponentId, _tm: PusTmVariant) -> Result<(), EcssTmtcError> { + Ok(()) + } + } + + // Testbench dedicated to the testing of [PusTcToRequestConverter]s + pub struct PusConverterTestbench< + Converter: PusTcToRequestConverter, + ActiveRequestInfo: ActiveRequestProvider, + Request, + > { + pub id: ComponentId, + pub verif_reporter: TestVerificationReporter, + pub converter: Converter, + dummy_sender: DummySender, + current_request_id: Option, + current_packet: Option>, + phantom: std::marker::PhantomData<(ActiveRequestInfo, Request)>, + } + + impl< + Converter: PusTcToRequestConverter, + ActiveRequestInfo: ActiveRequestProvider, + Request, + > PusConverterTestbench + { + pub fn new(owner_id: ComponentId, converter: Converter) -> Self { + let test_verif_reporter = TestVerificationReporter::new(owner_id); + Self { + id: owner_id, + verif_reporter: test_verif_reporter, + converter, + dummy_sender: DummySender::default(), + current_request_id: None, + current_packet: None, + phantom: std::marker::PhantomData, + } + } + + pub fn add_tc(&mut self, tc: &PusTcCreator) -> VerificationToken { + let token = self.verif_reporter.add_tc(tc); + self.current_request_id = Some(verification::RequestId::new(tc)); + self.current_packet = Some(tc.to_vec().unwrap()); + self.verif_reporter + .acceptance_success(&self.dummy_sender, token, &[]) + .expect("acceptance failed") + } + + pub fn request_id(&self) -> Option { + self.current_request_id + } + + pub fn convert( + &mut self, + token: VerificationToken, + time_stamp: &[u8], + expected_apid: u16, + expected_apid_target: u32, + ) -> Result<(ActiveRequestInfo, Request), Converter::Error> { + if self.current_packet.is_none() { + return Err(GenericConversionError::InvalidAppData( + "call add_tc first".to_string(), + )); + } + let current_packet = self.current_packet.take().unwrap(); + let tc_reader = PusTcReader::new(¤t_packet).unwrap(); + let (active_info, request) = self.converter.convert( + token, + &tc_reader.0, + &self.dummy_sender, + &self.verif_reporter, + time_stamp, + )?; + assert_eq!( + active_info.token().request_id(), + self.request_id().expect("no request id is set") + ); + assert_eq!( + active_info.target_id(), + UniqueApidTargetId::new(expected_apid, expected_apid_target).raw() + ); + Ok((active_info, request)) + } + } + + pub struct TargetedPusRequestTestbench< + RequestConverter: PusTcToRequestConverter, + ReplyHandler: PusReplyHandler, + ActiveRequestMap: ActiveRequestMapProvider, + ActiveRequestInfo: ActiveRequestProvider, + RequestType, + ReplyType, + > { + pub service: PusTargetedRequestService< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + TestVerificationReporter, + RequestConverter, + ReplyHandler, + ActiveRequestMap, + ActiveRequestInfo, + RequestType, + ReplyType, + >, + pub request_id: Option, + pub tm_funnel_rx: mpsc::Receiver, + pub pus_packet_tx: mpsc::Sender, + pub reply_tx: mpsc::Sender>, + pub request_rx: mpsc::Receiver>, + } +} diff --git a/src/pus/stack.rs b/src/pus/stack.rs new file mode 100644 index 0000000..acb6000 --- /dev/null +++ b/src/pus/stack.rs @@ -0,0 +1,75 @@ +// use crate::pus::mode::ModeServiceWrapper; +use derive_new::new; +use satrs::{ + pus::{EcssTcInMemConverter, EcssTmSenderCore}, + spacepackets::time::{cds, TimeWriter}, +}; +use crate::pus::HandlingStatus; +use crate::pus::test::TestCustomServiceWrapper; + +// use super::{ +// action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, +// scheduler::SchedulingServiceWrapper, test::TestCustomServiceWrapper, HandlingStatus, +// TargetedPusService, +// }; + +#[derive(new)] +pub struct PusStack { + test_srv: TestCustomServiceWrapper, + // hk_srv_wrapper: HkServiceWrapper, + // event_srv: EventServiceWrapper, + // action_srv_wrapper: ActionServiceWrapper, + // schedule_srv: SchedulingServiceWrapper, + // mode_srv: ModeServiceWrapper, +} + +impl + PusStack +{ + pub fn periodic_operation(&mut self) { + // Release all telecommands which reached their release time before calling the service + // handlers. + // self.schedule_srv.release_tcs(); + let time_stamp = cds::CdsTime::now_with_u16_days() + .expect("time stamp generation error") + .to_vec() + .unwrap(); + loop { + let mut nothing_to_do = true; + let mut is_srv_finished = + |tc_handling_done: bool, reply_handling_done: Option| { + if !tc_handling_done + || (reply_handling_done.is_some() + && reply_handling_done.unwrap() == HandlingStatus::Empty) + { + nothing_to_do = false; + } + }; + is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None); + // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); + // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); + // is_srv_finished( + // self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), + // Some( + // self.action_srv_wrapper + // .poll_and_handle_next_reply(&time_stamp), + // ), + // ); + // is_srv_finished( + // self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), + // Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), + // ); + // is_srv_finished( + // self.mode_srv.poll_and_handle_next_tc(&time_stamp), + // Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)), + // ); + if nothing_to_do { + // Timeout checking is only done once. + // self.action_srv_wrapper.check_for_request_timeouts(); + // self.hk_srv_wrapper.check_for_request_timeouts(); + // self.mode_srv.check_for_request_timeouts(); + break; + } + } + } +} diff --git a/src/pus/test.rs b/src/pus/test.rs new file mode 100644 index 0000000..9c8047f --- /dev/null +++ b/src/pus/test.rs @@ -0,0 +1,127 @@ +use crate::pus::create_verification_reporter; +use log::{info, warn}; +use satrs::event_man::{EventMessage, EventMessageU32}; +use satrs::pool::SharedStaticMemoryPool; +use satrs::pus::test::PusService17TestHandler; +use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; +use satrs::pus::EcssTcInSharedStoreConverter; +use satrs::pus::{ + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, + MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusServiceHelper, + PusTmAsVec, PusTmInPool, TmInSharedPoolSender, +}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::PusPacket; +use satrs::spacepackets::time::cds::CdsTime; +use satrs::spacepackets::time::TimeWriter; +use std::sync::mpsc; +use std::sync::mpsc::Sender; +use ops_sat_rs::config::components::PUS_TEST_SERVICE; +use ops_sat_rs::config::tmtc_err; + +pub fn create_test_service_dynamic( + tm_funnel_tx: mpsc::Sender, + // event_sender: mpsc::Sender, + pus_test_rx: mpsc::Receiver, +) -> TestCustomServiceWrapper, EcssTcInVecConverter> { + let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( + PUS_TEST_SERVICE.id(), + pus_test_rx, + tm_funnel_tx, + create_verification_reporter(PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.apid), + EcssTcInVecConverter::default(), + )); + TestCustomServiceWrapper { + handler: pus17_handler, + // test_srv_event_sender: event_sender, + } +} + +pub struct TestCustomServiceWrapper< + TmSender: EcssTmSenderCore, + TcInMemConverter: EcssTcInMemConverter, +> { + pub handler: + PusService17TestHandler, + // pub test_srv_event_sender: mpsc::Sender, +} + +impl + TestCustomServiceWrapper +{ + pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool { + let res = self.handler.poll_and_handle_next_tc(time_stamp); + if res.is_err() { + warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); + return true; + } + match res.unwrap() { + PusPacketHandlerResult::RequestHandled => { + info!("Received PUS ping command TC[17,1]"); + info!("Sent ping reply PUS TM[17,2]"); + } + PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => { + warn!( + "Handled PUS ping command with partial success: {:?}", + partial_err + ); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS17: Subservice {subservice} not implemented") + } + // TODO: adapt interface events are implemented + PusPacketHandlerResult::CustomSubservice(subservice, token) => { + let (tc, _) = PusTcReader::new( + self.handler + .service_helper + .tc_in_mem_converter + .tc_slice_raw(), + ) + .unwrap(); + let time_stamper = CdsTime::now_with_u16_days().unwrap(); + let mut stamp_buf: [u8; 7] = [0; 7]; + time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); + if subservice == 128 { + info!("Generating test event"); + // self.test_srv_event_sender + // .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) + // .expect("Sending test event failed"); + let start_token = self + .handler + .service_helper + .verif_reporter() + .start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf) + .expect("Error sending start success"); + self.handler + .service_helper + .verif_reporter() + .completion_success( + self.handler.service_helper.tm_sender(), + start_token, + &stamp_buf, + ) + .expect("Error sending completion success"); + } else { + let fail_data = [tc.subservice()]; + self.handler + .service_helper + .verif_reporter() + .start_failure( + self.handler.service_helper.tm_sender(), + token, + FailParams::new( + &stamp_buf, + &tmtc_err::INVALID_PUS_SUBSERVICE, + &fail_data, + ), + ) + .expect("Sending start failure verification failed"); + } + } + PusPacketHandlerResult::Empty => { + return true; + } + } + false + } +} diff --git a/src/requests.rs b/src/requests.rs new file mode 100644 index 0000000..2fc0824 --- /dev/null +++ b/src/requests.rs @@ -0,0 +1,152 @@ +use std::collections::HashMap; +use std::sync::mpsc; + +use log::warn; +use satrs::action::ActionRequest; +use satrs::hk::HkRequest; +use satrs::mode::ModeRequest; +use satrs::pus::verification::{ + FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken, +}; +use satrs::pus::{ActiveRequestProvider, EcssTmSenderCore, GenericRoutingError, PusRequestRouter}; +use satrs::queue::GenericSendError; +use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::PusPacket; +use satrs::ComponentId; +use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; +use ops_sat_rs::config::tmtc_err; + +#[derive(Clone, Debug)] +#[non_exhaustive] +pub enum CompositeRequest { + Hk(HkRequest), + Action(ActionRequest), +} + +#[derive(Clone)] +pub struct GenericRequestRouter { + pub id: ComponentId, + // All messages which do not have a dedicated queue. + pub composite_router_map: HashMap>>, + pub mode_router_map: HashMap>>, +} + +impl Default for GenericRequestRouter { + fn default() -> Self { + Self { + id: PUS_ROUTING_SERVICE.raw(), + composite_router_map: Default::default(), + mode_router_map: Default::default(), + } + } +} +impl GenericRequestRouter { + pub(crate) fn handle_error_generic( + &self, + active_request: &impl ActiveRequestProvider, + tc: &PusTcReader, + error: GenericRoutingError, + tm_sender: &(impl EcssTmSenderCore + ?Sized), + verif_reporter: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) { + warn!( + "Routing request for service {} failed: {error:?}", + tc.service() + ); + let accepted_token: VerificationToken = active_request + .token() + .try_into() + .expect("token is not in accepted state"); + match error { + GenericRoutingError::UnknownTargetId(id) => { + let apid_target_id = UniqueApidTargetId::from(id); + warn!("Target APID for request: {}", apid_target_id.apid); + warn!("Target Unique ID for request: {}", apid_target_id.unique_id); + let mut fail_data: [u8; 8] = [0; 8]; + fail_data.copy_from_slice(&id.to_be_bytes()); + verif_reporter + .completion_failure( + tm_sender, + accepted_token, + FailParams::new(time_stamp, &tmtc_err::UNKNOWN_TARGET_ID, &fail_data), + ) + .expect("Sending start failure failed"); + } + GenericRoutingError::Send(_) => { + let mut fail_data: [u8; 8] = [0; 8]; + fail_data.copy_from_slice(&active_request.target_id().to_be_bytes()); + verif_reporter + .completion_failure( + tm_sender, + accepted_token, + FailParams::new(time_stamp, &tmtc_err::ROUTING_ERROR, &fail_data), + ) + .expect("Sending start failure failed"); + } + } + } +} +impl PusRequestRouter for GenericRequestRouter { + type Error = GenericRoutingError; + + fn route( + &self, + requestor_info: MessageMetadata, + target_id: ComponentId, + hk_request: HkRequest, + ) -> Result<(), Self::Error> { + if let Some(sender) = self.composite_router_map.get(&target_id) { + sender + .send(GenericMessage::new( + requestor_info, + CompositeRequest::Hk(hk_request), + )) + .map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?; + return Ok(()); + } + Err(GenericRoutingError::UnknownTargetId(target_id)) + } +} + +impl PusRequestRouter for GenericRequestRouter { + type Error = GenericRoutingError; + + fn route( + &self, + requestor_info: MessageMetadata, + target_id: ComponentId, + action_request: ActionRequest, + ) -> Result<(), Self::Error> { + if let Some(sender) = self.composite_router_map.get(&target_id) { + sender + .send(GenericMessage::new( + requestor_info, + CompositeRequest::Action(action_request), + )) + .map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?; + return Ok(()); + } + Err(GenericRoutingError::UnknownTargetId(target_id)) + } +} + +impl PusRequestRouter for GenericRequestRouter { + type Error = GenericRoutingError; + + fn route( + &self, + requestor_info: MessageMetadata, + target_id: ComponentId, + request: ModeRequest, + ) -> Result<(), Self::Error> { + if let Some(sender) = self.mode_router_map.get(&target_id) { + sender + .send(GenericMessage::new(requestor_info, request)) + .map_err(|_| GenericRoutingError::Send(GenericSendError::RxDisconnected))?; + return Ok(()); + } + Err(GenericRoutingError::UnknownTargetId(target_id)) + } +} diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs new file mode 100644 index 0000000..f81d6bf --- /dev/null +++ b/src/tm_funnel.rs @@ -0,0 +1,157 @@ +use std::{ + collections::HashMap, + sync::mpsc::{self}, +}; + +use log::info; +use satrs::pus::{PusTmAsVec, PusTmInPool}; +use satrs::{ + pool::PoolProvider, + seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, + spacepackets::{ + ecss::{tm::PusTmZeroCopyWriter, PusPacket}, + time::cds::MIN_CDS_FIELD_LEN, + CcsdsPacket, + }, + tmtc::tm_helper::SharedTmPool, +}; + +use crate::interface::tcp::SyncTcpTmSource; + +#[derive(Default)] +pub struct CcsdsSeqCounterMap { + apid_seq_counter_map: HashMap, +} +impl CcsdsSeqCounterMap { + pub fn get_and_increment(&mut self, apid: u16) -> u16 { + self.apid_seq_counter_map + .entry(apid) + .or_default() + .get_and_increment() + } +} + +pub struct TmFunnelCommon { + seq_counter_map: CcsdsSeqCounterMap, + msg_counter_map: HashMap, + sync_tm_tcp_source: SyncTcpTmSource, +} + +impl TmFunnelCommon { + pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self { + Self { + seq_counter_map: Default::default(), + msg_counter_map: Default::default(), + sync_tm_tcp_source, + } + } + + // Applies common packet processing operations for PUS TM packets. This includes setting + // a sequence counter + fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) { + // zero_copy_writer.set_apid(PUS_APID); + zero_copy_writer.set_seq_count( + self.seq_counter_map + .get_and_increment(zero_copy_writer.apid()), + ); + let entry = self + .msg_counter_map + .entry(zero_copy_writer.service()) + .or_insert(0); + zero_copy_writer.set_msg_count(*entry); + if *entry == u16::MAX { + *entry = 0; + } else { + *entry += 1; + } + + Self::packet_printout(&zero_copy_writer); + // This operation has to come last! + zero_copy_writer.finish(); + } + + fn packet_printout(tm: &PusTmZeroCopyWriter) { + info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); + } +} + +pub struct TmFunnelStatic { + common: TmFunnelCommon, + shared_tm_store: SharedTmPool, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, +} + +impl TmFunnelStatic { + pub fn new( + shared_tm_store: SharedTmPool, + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::SyncSender, + ) -> Self { + Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), + shared_tm_store, + tm_funnel_rx, + tm_server_tx, + } + } + + pub fn operation(&mut self) { + if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let shared_pool = self.shared_tm_store.clone_backing_pool(); + let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); + let mut tm_copy = Vec::new(); + pool_guard + .modify(&pus_tm_in_pool.store_addr, |buf| { + let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + tm_copy = buf.to_vec() + }) + .expect("Reading TM from pool failed"); + self.tm_server_tx + .send(pus_tm_in_pool) + .expect("Sending TM to server failed"); + // We could also do this step in the update closure, but I'd rather avoid this, could + // lead to nested locking. + self.common.sync_tm_tcp_source.add_tm(&tm_copy); + } + } +} + +pub struct TmFunnelDynamic { + common: TmFunnelCommon, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, +} + +impl TmFunnelDynamic { + pub fn new( + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, + ) -> Self { + Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), + tm_funnel_rx, + tm_server_tx, + } + } + + pub fn operation(&mut self) { + if let Ok(mut tm) = self.tm_funnel_rx.recv() { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.common.sync_tm_tcp_source.add_tm(&tm.packet); + self.tm_server_tx + .send(tm) + .expect("Sending TM to server failed"); + } + } +} diff --git a/src/tmtc.rs b/src/tmtc.rs index 1857bb9..b9f1809 100644 --- a/src/tmtc.rs +++ b/src/tmtc.rs @@ -4,6 +4,21 @@ use satrs::{ tmtc::ReceivesCcsdsTc, }; use std::sync::mpsc::{self, SendError, Sender, TryRecvError}; +use satrs::pool::{StoreAddr, StoreError}; +use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender}; +use satrs::spacepackets::ecss::PusPacket; +use crate::pus::PusReceiver; +use thiserror::Error; + +#[derive(Debug, Clone, PartialEq, Eq, Error)] +pub enum MpscStoreAndSendError { + #[error("Store error: {0}")] + Store(#[from] StoreError), + #[error("TC send error: {0}")] + TcSend(#[from] SendError), + #[error("TMTC send error: {0}")] + TmTcSend(#[from] SendError), +} // Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. #[derive(Clone)] @@ -30,17 +45,17 @@ impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { // TC source components where the heap is the backing memory of the received telecommands. pub struct TcSourceTaskDynamic { pub tc_receiver: mpsc::Receiver>, - // pus_receiver: PusReceiver, + pus_receiver: PusReceiver, } impl TcSourceTaskDynamic { pub fn new( tc_receiver: mpsc::Receiver>, - // pus_receiver: PusReceiver, + pus_receiver: PusReceiver, ) -> Self { Self { tc_receiver, - // pus_receiver, + pus_receiver, } } @@ -52,7 +67,6 @@ impl TcSourceTaskDynamic { match self.tc_receiver.try_recv() { Ok(tc) => match PusTcReader::new(&tc) { Ok((pus_tc, _)) => { - /* self.pus_receiver .handle_tc_packet( satrs::pus::TcInMemory::Vec(tc.clone()), @@ -60,7 +74,6 @@ impl TcSourceTaskDynamic { &pus_tc, ) .ok(); - */ true } Err(e) => { From b5b01b2cebe760bc56fd6473b44ae996ac1d8e1d Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 9 Apr 2024 13:52:02 +0200 Subject: [PATCH 2/4] cargo fmt and cargo clippy --- Cargo.lock | 12 +++++----- Cargo.toml | 1 + src/ccsds.rs | 4 ++-- src/config.rs | 10 ++++----- src/interface/udp.rs | 5 ++--- src/lib.rs | 1 - src/main.rs | 33 ++++++++++++++-------------- src/pus/mod.rs | 14 +++++++----- src/pus/stack.rs | 4 ++-- src/pus/test.rs | 14 +++++------- src/requests.rs | 6 +++-- src/tm_funnel.rs | 52 ++------------------------------------------ src/tmtc.rs | 8 +++---- 13 files changed, 58 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d5e3bb..8eede90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,9 +58,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bumpalo" -version = "3.15.4" +version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bus" @@ -81,9 +81,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "cc" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd97381a8cc6493395a5afc4c691c1084b3768db713b73aa215217aa245d153" +checksum = "2678b2e3449475e95b0aa6f9b506a28e61b3dc8996592b983695e8ebb58a8b41" [[package]] name = "cfg-if" @@ -119,9 +119,9 @@ checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "crc" -version = "3.2.0" +version = "3.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2b432c56615136f8dba245fed7ec3d5518c500a31108661067e61e72fe7e6bc" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" dependencies = [ "crc-catalog", ] diff --git a/Cargo.toml b/Cargo.toml index 17f5bc8..5ed661d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ num_enum = "0.7" version = "0.2.0-rc.0" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" branch = "main" +feature = ["test_util"] [dependencies.satrs-mib] version = "0.1.1" diff --git a/src/ccsds.rs b/src/ccsds.rs index e1417f4..0ba4776 100644 --- a/src/ccsds.rs +++ b/src/ccsds.rs @@ -1,9 +1,9 @@ +use ops_sat_rs::config::components::Apid; +use ops_sat_rs::config::APID_VALIDATOR; use satrs::pus::ReceivesEcssPusTc; use satrs::spacepackets::{CcsdsPacket, SpHeader}; use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; use satrs::ValidatorU16Id; -use ops_sat_rs::config::components::Apid; -use ops_sat_rs::config::APID_VALIDATOR; #[derive(Clone)] pub struct CcsdsReceiver< diff --git a/src/config.rs b/src/config.rs index 53a54df..cccd73c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,10 @@ use lazy_static::lazy_static; +use num_enum::{IntoPrimitive, TryFromPrimitive}; use satrs::spacepackets::{PacketId, PacketType}; +use satrs_mib::res_code::ResultU16Info; +use satrs_mib::resultcode; use std::{collections::HashSet, net::Ipv4Addr}; use strum::IntoEnumIterator; -use satrs_mib::resultcode; -use satrs_mib::res_code::ResultU16Info; -use num_enum::{IntoPrimitive, TryFromPrimitive}; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; @@ -41,8 +41,8 @@ pub enum GroupId { } pub mod tmtc_err { - use satrs::res_code::ResultU16; use super::*; + use satrs::res_code::ResultU16; #[resultcode] pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 0); @@ -60,7 +60,7 @@ pub mod tmtc_err { pub const REQUEST_TIMEOUT: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 6); #[resultcode( - info = "Not enough data inside the TC application data field. Optionally includes: \ + info = "Not enough data inside the TC application data field. Optionally includes: \ 8 bytes of failure data containing 2 failure parameters, \ P1 (u32 big endian): Expected data length, P2: Found data length" )] diff --git a/src/interface/udp.rs b/src/interface/udp.rs index b2e3b62..5c45e9e 100644 --- a/src/interface/udp.rs +++ b/src/interface/udp.rs @@ -2,10 +2,9 @@ use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; -use satrs::pus::{PusTmAsVec, PusTmInPool}; +use satrs::pus::PusTmAsVec; use satrs::{ hal::std::udp_server::{ReceiveResult, UdpTcServer}, - pool::{PoolProviderWithGuards, SharedStaticMemoryPool}, tmtc::CcsdsError, }; @@ -90,7 +89,7 @@ mod tests { }, tmtc::ReceivesTcCore, }; - use satrs_example::config::{components, OBSW_SERVER_ADDR}; + use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use super::*; diff --git a/src/lib.rs b/src/lib.rs index ef68b47..d637744 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -use std::net::Ipv4Addr; use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; diff --git a/src/main.rs b/src/main.rs index 60a36d5..2db5ca5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use std::{ }; use log::info; +use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; use ops_sat_rs::config::{ tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, }; @@ -13,29 +14,27 @@ use satrs::{ hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, tmtc::CcsdsDistributor, }; -use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; -use crate::{ - ccsds::CcsdsReceiver, - logger::setup_logger, - interface::tcp::{SyncTcpTmSource, TcpTask}, - tmtc::PusTcSourceProviderDynamic, - interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, -}; -use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::pus::stack::PusStack; use crate::pus::test::create_test_service_dynamic; -use crate::requests::GenericRequestRouter; +use crate::pus::{PusReceiver, PusTcMpscRouter}; use crate::tm_funnel::TmFunnelDynamic; use crate::tmtc::TcSourceTaskDynamic; +use crate::{ + ccsds::CcsdsReceiver, + interface::tcp::{SyncTcpTmSource, TcpTask}, + interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, + logger::setup_logger, + tmtc::PusTcSourceProviderDynamic, +}; mod ccsds; -mod logger; -mod tmtc; -mod requests; -mod pus; -mod tm_funnel; mod interface; +mod logger; +mod pus; +mod requests; +mod tm_funnel; +mod tmtc; #[allow(dead_code)] fn main() { @@ -136,7 +135,7 @@ fn main() { tcp_ccsds_distributor, PACKET_ID_VALIDATOR.clone(), ) - .expect("tcp server creation failed"); + .expect("tcp server creation failed"); let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); @@ -193,4 +192,4 @@ fn main() { jh_pus_handler .join() .expect("Joining PUS handler thread failed"); -} \ No newline at end of file +} diff --git a/src/pus/mod.rs b/src/pus/mod.rs index d32629d..62deb27 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,8 +1,12 @@ -pub mod test; pub mod stack; +pub mod test; use crate::requests::GenericRequestRouter; +use crate::tmtc::MpscStoreAndSendError; use log::warn; +use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; +use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; +use ops_sat_rs::TimeStampHelper; use satrs::pus::verification::{ self, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter, VerificationReporterCfg, VerificationReportingProvider, VerificationToken, @@ -18,12 +22,8 @@ use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusServiceId; use satrs::ComponentId; -use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; -use ops_sat_rs::TimeStampHelper; use std::fmt::Debug; use std::sync::mpsc::{self, Sender}; -use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; -use crate::tmtc::MpscStoreAndSendError; // pub mod action; // pub mod event; @@ -34,6 +34,7 @@ use crate::tmtc::MpscStoreAndSendError; // pub mod test; #[derive(Debug, PartialEq, Eq, Copy, Clone)] +#[allow(dead_code)] pub enum HandlingStatus { Empty, HandledOne, @@ -185,6 +186,7 @@ pub trait TargetedPusService { /// 1. [Self::handle_one_tc] which tries to poll and handle one TC packet, covering steps 1-5. /// 2. [Self::check_one_reply] which tries to poll and handle one reply, covering step 6. /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. +#[allow(dead_code)] pub struct PusTargetedRequestService< TcReceiver: EcssTcReceiverCore, TmSender: EcssTmSenderCore, @@ -207,6 +209,7 @@ pub struct PusTargetedRequestService< phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, } +#[allow(dead_code)] impl< TcReceiver: EcssTcReceiverCore, TmSender: EcssTmSenderCore, @@ -437,6 +440,7 @@ where /// Generic timeout handling: Handle the verification failure with a dedicated return code /// and also log the error. +#[allow(dead_code)] pub fn generic_pus_request_timeout_handler( sender: &(impl EcssTmSenderCore + ?Sized), active_request: &(impl ActiveRequestProvider + Debug), diff --git a/src/pus/stack.rs b/src/pus/stack.rs index acb6000..f5aa1c1 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -1,11 +1,11 @@ // use crate::pus::mode::ModeServiceWrapper; +use crate::pus::test::TestCustomServiceWrapper; +use crate::pus::HandlingStatus; use derive_new::new; use satrs::{ pus::{EcssTcInMemConverter, EcssTmSenderCore}, spacepackets::time::{cds, TimeWriter}, }; -use crate::pus::HandlingStatus; -use crate::pus::test::TestCustomServiceWrapper; // use super::{ // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, diff --git a/src/pus/test.rs b/src/pus/test.rs index 9c8047f..ec63005 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -1,29 +1,25 @@ use crate::pus::create_verification_reporter; use log::{info, warn}; -use satrs::event_man::{EventMessage, EventMessageU32}; -use satrs::pool::SharedStaticMemoryPool; +use ops_sat_rs::config::components::PUS_TEST_SERVICE; +use ops_sat_rs::config::tmtc_err; +// use satrs::event_man::{EventMessage, EventMessageU32}; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; -use satrs::pus::EcssTcInSharedStoreConverter; use satrs::pus::{ EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, - MpscTmAsVecSender, MpscTmInSharedPoolSenderBounded, PusPacketHandlerResult, PusServiceHelper, - PusTmAsVec, PusTmInPool, TmInSharedPoolSender, + MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; use std::sync::mpsc; -use std::sync::mpsc::Sender; -use ops_sat_rs::config::components::PUS_TEST_SERVICE; -use ops_sat_rs::config::tmtc_err; pub fn create_test_service_dynamic( tm_funnel_tx: mpsc::Sender, // event_sender: mpsc::Sender, pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper, EcssTcInVecConverter> { +) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( PUS_TEST_SERVICE.id(), pus_test_rx, diff --git a/src/requests.rs b/src/requests.rs index 2fc0824..90ed366 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::sync::mpsc; use log::warn; +use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; +use ops_sat_rs::config::tmtc_err; use satrs::action::ActionRequest; use satrs::hk::HkRequest; use satrs::mode::ModeRequest; @@ -14,8 +16,6 @@ use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; use satrs::ComponentId; -use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; -use ops_sat_rs::config::tmtc_err; #[derive(Clone, Debug)] #[non_exhaustive] @@ -41,6 +41,8 @@ impl Default for GenericRequestRouter { } } } + +#[allow(dead_code)] impl GenericRequestRouter { pub(crate) fn handle_error_generic( &self, diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs index f81d6bf..20c9d91 100644 --- a/src/tm_funnel.rs +++ b/src/tm_funnel.rs @@ -4,16 +4,14 @@ use std::{ }; use log::info; -use satrs::pus::{PusTmAsVec, PusTmInPool}; +use satrs::pus::PusTmAsVec; use satrs::{ - pool::PoolProvider, seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, spacepackets::{ ecss::{tm::PusTmZeroCopyWriter, PusPacket}, time::cds::MIN_CDS_FIELD_LEN, CcsdsPacket, }, - tmtc::tm_helper::SharedTmPool, }; use crate::interface::tcp::SyncTcpTmSource; @@ -22,6 +20,7 @@ use crate::interface::tcp::SyncTcpTmSource; pub struct CcsdsSeqCounterMap { apid_seq_counter_map: HashMap, } + impl CcsdsSeqCounterMap { pub fn get_and_increment(&mut self, apid: u16) -> u16 { self.apid_seq_counter_map @@ -75,53 +74,6 @@ impl TmFunnelCommon { } } -pub struct TmFunnelStatic { - common: TmFunnelCommon, - shared_tm_store: SharedTmPool, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::SyncSender, -} - -impl TmFunnelStatic { - pub fn new( - shared_tm_store: SharedTmPool, - sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::SyncSender, - ) -> Self { - Self { - common: TmFunnelCommon::new(sync_tm_tcp_source), - shared_tm_store, - tm_funnel_rx, - tm_server_tx, - } - } - - pub fn operation(&mut self) { - if let Ok(pus_tm_in_pool) = self.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let shared_pool = self.shared_tm_store.clone_backing_pool(); - let mut pool_guard = shared_pool.write().expect("Locking TM pool failed"); - let mut tm_copy = Vec::new(); - pool_guard - .modify(&pus_tm_in_pool.store_addr, |buf| { - let zero_copy_writer = PusTmZeroCopyWriter::new(buf, MIN_CDS_FIELD_LEN) - .expect("Creating TM zero copy writer failed"); - self.common.apply_packet_processing(zero_copy_writer); - tm_copy = buf.to_vec() - }) - .expect("Reading TM from pool failed"); - self.tm_server_tx - .send(pus_tm_in_pool) - .expect("Sending TM to server failed"); - // We could also do this step in the update closure, but I'd rather avoid this, could - // lead to nested locking. - self.common.sync_tm_tcp_source.add_tm(&tm_copy); - } - } -} - pub struct TmFunnelDynamic { common: TmFunnelCommon, tm_funnel_rx: mpsc::Receiver, diff --git a/src/tmtc.rs b/src/tmtc.rs index b9f1809..85eb5e6 100644 --- a/src/tmtc.rs +++ b/src/tmtc.rs @@ -1,13 +1,13 @@ +use crate::pus::PusReceiver; +use satrs::pool::{StoreAddr, StoreError}; +use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender}; +use satrs::spacepackets::ecss::PusPacket; use satrs::{ pus::ReceivesEcssPusTc, spacepackets::{ecss::tc::PusTcReader, SpHeader}, tmtc::ReceivesCcsdsTc, }; use std::sync::mpsc::{self, SendError, Sender, TryRecvError}; -use satrs::pool::{StoreAddr, StoreError}; -use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender}; -use satrs::spacepackets::ecss::PusPacket; -use crate::pus::PusReceiver; use thiserror::Error; #[derive(Debug, Clone, PartialEq, Eq, Error)] From c415755d502073ab126a8e15fa0060564ecb5692 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 9 Apr 2024 13:54:34 +0200 Subject: [PATCH 3/4] enable test feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5ed661d..3ea750e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ num_enum = "0.7" version = "0.2.0-rc.0" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" branch = "main" -feature = ["test_util"] +features = ["test_util"] [dependencies.satrs-mib] version = "0.1.1" From 08432233cf7341c8bbdf697cc32896bceb988aa6 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 9 Apr 2024 13:56:18 +0200 Subject: [PATCH 4/4] bbetter name for pyclient --- {ops-sat-rs-tmtc => pytmtc}/.gitignore | 0 {ops-sat-rs-tmtc => pytmtc}/common.py | 0 {ops-sat-rs-tmtc => pytmtc}/main.py | 0 {ops-sat-rs-tmtc => pytmtc}/pus_tc.py | 0 {ops-sat-rs-tmtc => pytmtc}/pus_tm.py | 0 {ops-sat-rs-tmtc => pytmtc}/requirements.txt | 0 {ops-sat-rs-tmtc => pytmtc}/tc_definitions.py | 0 {ops-sat-rs-tmtc => pytmtc}/tmtc_conf.json | 0 8 files changed, 0 insertions(+), 0 deletions(-) rename {ops-sat-rs-tmtc => pytmtc}/.gitignore (100%) rename {ops-sat-rs-tmtc => pytmtc}/common.py (100%) rename {ops-sat-rs-tmtc => pytmtc}/main.py (100%) mode change 100644 => 100755 rename {ops-sat-rs-tmtc => pytmtc}/pus_tc.py (100%) rename {ops-sat-rs-tmtc => pytmtc}/pus_tm.py (100%) rename {ops-sat-rs-tmtc => pytmtc}/requirements.txt (100%) rename {ops-sat-rs-tmtc => pytmtc}/tc_definitions.py (100%) rename {ops-sat-rs-tmtc => pytmtc}/tmtc_conf.json (100%) diff --git a/ops-sat-rs-tmtc/.gitignore b/pytmtc/.gitignore similarity index 100% rename from ops-sat-rs-tmtc/.gitignore rename to pytmtc/.gitignore diff --git a/ops-sat-rs-tmtc/common.py b/pytmtc/common.py similarity index 100% rename from ops-sat-rs-tmtc/common.py rename to pytmtc/common.py diff --git a/ops-sat-rs-tmtc/main.py b/pytmtc/main.py old mode 100644 new mode 100755 similarity index 100% rename from ops-sat-rs-tmtc/main.py rename to pytmtc/main.py diff --git a/ops-sat-rs-tmtc/pus_tc.py b/pytmtc/pus_tc.py similarity index 100% rename from ops-sat-rs-tmtc/pus_tc.py rename to pytmtc/pus_tc.py diff --git a/ops-sat-rs-tmtc/pus_tm.py b/pytmtc/pus_tm.py similarity index 100% rename from ops-sat-rs-tmtc/pus_tm.py rename to pytmtc/pus_tm.py diff --git a/ops-sat-rs-tmtc/requirements.txt b/pytmtc/requirements.txt similarity index 100% rename from ops-sat-rs-tmtc/requirements.txt rename to pytmtc/requirements.txt diff --git a/ops-sat-rs-tmtc/tc_definitions.py b/pytmtc/tc_definitions.py similarity index 100% rename from ops-sat-rs-tmtc/tc_definitions.py rename to pytmtc/tc_definitions.py diff --git a/ops-sat-rs-tmtc/tmtc_conf.json b/pytmtc/tmtc_conf.json similarity index 100% rename from ops-sat-rs-tmtc/tmtc_conf.json rename to pytmtc/tmtc_conf.json