diff --git a/Cargo.lock b/Cargo.lock index b85f20f..e4e16ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,10 +15,19 @@ dependencies = [ ] [[package]] -name = "allocator-api2" -version = "0.2.16" +name = "aho-corasick" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "android-tzdata" @@ -35,6 +44,54 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "array-init" version = "0.0.4" @@ -81,9 +138,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "cc" -version = "1.0.92" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2678b2e3449475e95b0aa6f9b506a28e61b3dc8996592b983695e8ebb58a8b41" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" [[package]] name = "cfg-if" @@ -93,9 +150,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", @@ -103,7 +160,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -111,6 +168,12 @@ name = "cobs" version = "0.2.3" source = "git+https://github.com/robamu/cobs.rs.git?branch=all_features#c70a7f30fd00a7cbdb7666dec12b437977385d40" +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -187,7 +250,7 @@ checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -202,6 +265,29 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -323,6 +409,18 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "nodrop" version = "0.1.14" @@ -366,7 +464,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -381,10 +479,12 @@ version = "0.0.1" dependencies = [ "chrono", "derive-new", + "env_logger", "fern", "humantime", "lazy_static", "log", + "mio", "num_enum", "satrs", "satrs-mib", @@ -422,18 +522,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.35" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] @@ -447,6 +547,35 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" + [[package]] name = "rustversion" version = "1.0.15" @@ -462,7 +591,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "satrs" version = "0.2.0-rc.0" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "bus", "cobs", @@ -472,6 +601,7 @@ dependencies = [ "downcast-rs", "dyn-clone", "hashbrown", + "mio", "num-traits", "num_enum", "paste", @@ -486,7 +616,7 @@ dependencies = [ [[package]] name = "satrs-mib" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "csv", "satrs-mib-codegen", @@ -498,17 +628,17 @@ dependencies = [ [[package]] name = "satrs-mib-codegen" version = "0.1.1" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] name = "satrs-shared" version = "0.1.3" -source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#0fec99402803b69fbba8ac7537197e0c7f1c836f" +source = "git+https://egit.irs.uni-stuttgart.de/rust/sat-rs.git?branch=main#786671bbd785ecee4282985c730f9655134a87f9" dependencies = [ "serde", "spacepackets", @@ -542,7 +672,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -567,7 +697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -605,7 +735,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -621,9 +751,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.58" +version = "2.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" +checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" dependencies = [ "proc-macro2", "quote", @@ -647,7 +777,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] [[package]] @@ -673,12 +803,24 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -700,7 +842,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", "wasm-bindgen-shared", ] @@ -722,7 +864,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -739,7 +881,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", ] [[package]] @@ -748,7 +899,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", + "windows-targets 0.52.5", ] [[package]] @@ -768,17 +919,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -789,9 +941,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -801,9 +953,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -813,9 +965,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -825,9 +983,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -837,9 +995,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -849,9 +1007,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -861,9 +1019,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.4" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" @@ -892,5 +1050,5 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.58", + "syn 2.0.59", ] diff --git a/Cargo.toml b/Cargo.toml index a970584..50d51a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ strum = { version = "0.26", features = ["derive"] } thiserror = "1" derive-new = "0.6" num_enum = "0.7" +mio = "0.8" [dependencies.satrs] version = "0.2.0-rc.0" @@ -26,3 +27,14 @@ features = ["test_util"] version = "0.1.1" git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" branch = "main" + +[dev-dependencies] +env_logger = "0.11" + +# I don't think we need insane performance. If anything, a small binary is easier to upload +# to the satellite. +[profile.release] +strip = true +opt-level = "z" # Optimize for size. +lto = true +codegen-units = 1 diff --git a/README.md b/README.md index c0b42a6..069dd95 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ appears to be a useful source for documentation. - [OBSW documents](https://opssat1.esoc.esa.int/projects/experimenter-information/dmsf?folder_id=7) - [Software Integration Process](https://opssat1.esoc.esa.int/dmsf/files/34/view) +- [SPP/TCP bridge](https://opssat1.esoc.esa.int/dmsf/files/65/view) - [Cross-compiling SEPP](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Cross-compiling_SEPP_application) - [TMTC infrastructure](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Live_TM_TC_data) - [Submitting an Experiment](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Building_and_submitting_your_application_to_ESOC) diff --git a/pytmtc/.gitignore b/pytmtc/.gitignore index d994678..53b730f 100644 --- a/pytmtc/.gitignore +++ b/pytmtc/.gitignore @@ -1,3 +1,4 @@ +/tmtc_conf.json __pycache__ /venv diff --git a/pytmtc/common.py b/pytmtc/common.py index 6f56604..56b469f 100644 --- a/pytmtc/common.py +++ b/pytmtc/common.py @@ -5,11 +5,8 @@ import enum import struct -class Apid(enum.IntEnum): - SCHED = 1 - GENERIC_PUS = 2 - ACS = 3 - CFDP = 4 +EXPERIMENT_ID = 278 +EXPERIMENT_APID = 1024 + EXPERIMENT_ID class EventSeverity(enum.IntEnum): diff --git a/pytmtc/main.py b/pytmtc/main.py index a3e0caf..9fe494c 100755 --- a/pytmtc/main.py +++ b/pytmtc/main.py @@ -47,7 +47,7 @@ from tmtccmd.util.obj_id import ObjectIdDictT import pus_tc -from common import Apid, EventU32 +from common import EXPERIMENT_APID, EventU32 _LOGGER = logging.getLogger() @@ -64,8 +64,7 @@ class SatRsConfigHook(HookBase): assert self.cfg_path is not None packet_id_list = [] - for apid in Apid: - packet_id_list.append(PacketId(PacketType.TM, True, apid)) + packet_id_list.append(PacketId(PacketType.TM, True, EXPERIMENT_APID)) cfg = create_com_interface_cfg_default( com_if_key=com_if_key, json_cfg_path=self.cfg_path, @@ -181,7 +180,7 @@ class TcHandler(TcHandlerBase): tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, seq_cnt_provider=seq_count_provider, pus_verificator=self.verif_wrapper.pus_verificator, - default_pus_apid=None, + default_pus_apid=EXPERIMENT_APID, ) def send_cb(self, send_params: SendCbParams): diff --git a/pytmtc/pus_tc.py b/pytmtc/pus_tc.py index b0febdc..3a8e83d 100644 --- a/pytmtc/pus_tc.py +++ b/pytmtc/pus_tc.py @@ -10,7 +10,6 @@ from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice -from common import AcsId, Apid _LOGGER = logging.getLogger(__name__) @@ -66,14 +65,6 @@ def create_cmd_definition_tree() -> CmdTreeNode: ) root_node.add_child(scheduler_node) - acs_node = CmdTreeNode("acs", "ACS Subsystem Node") - mgm_node = CmdTreeNode("mgms", "MGM devices node") - mgm_node.add_child(mode_node) - mgm_node.add_child(hk_node) - - acs_node.add_child(mgm_node) - root_node.add_child(acs_node) - return root_node @@ -87,14 +78,10 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): assert len(cmd_path_list) >= 2 if cmd_path_list[1] == "ping": q.add_log_cmd("Sending PUS ping telecommand") - return q.add_pus_tc( - PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1) - ) + return q.add_pus_tc(PusTelecommand(service=17, subservice=1)) elif cmd_path_list[1] == "trigger_event": q.add_log_cmd("Triggering test event") - return q.add_pus_tc( - PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128) - ) + return q.add_pus_tc(PusTelecommand(service=17, subservice=128)) if cmd_path_list[0] == "scheduler": assert len(cmd_path_list) >= 2 if cmd_path_list[1] == "schedule_ping_10_secs_ahead": @@ -106,27 +93,10 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): create_time_tagged_cmd( time_stamp, PusTelecommand(service=17, subservice=1), - apid=Apid.SCHED, ) ) if cmd_path_list[0] == "acs": assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "mgms": - assert len(cmd_path_list) >= 3 - if cmd_path_list[2] == "hk": - if cmd_path_list[3] == "one_shot_hk": - q.add_log_cmd("Sending HK one shot request") - # TODO: Fix - # q.add_pus_tc( - # create_request_one_hk_command( - # make_addressable_id(Apid.ACS, AcsId.MGM_SET) - # ) - # ) - if cmd_path_list[2] == "mode": - if cmd_path_list[3] == "set_mode": - handle_set_mode_cmd( - q, "MGM 0", cmd_path_list[4], Apid.ACS, AcsId.MGM_0 - ) def handle_set_mode_cmd( diff --git a/pytmtc/tmtc_conf.json b/pytmtc/tmtc_conf.json.sample similarity index 100% rename from pytmtc/tmtc_conf.json rename to pytmtc/tmtc_conf.json.sample diff --git a/src/ccsds.rs b/src/ccsds.rs deleted file mode 100644 index 0ba4776..0000000 --- a/src/ccsds.rs +++ /dev/null @@ -1,53 +0,0 @@ -use ops_sat_rs::config::components::Apid; -use ops_sat_rs::config::APID_VALIDATOR; -use satrs::pus::ReceivesEcssPusTc; -use satrs::spacepackets::{CcsdsPacket, SpHeader}; -use satrs::tmtc::{CcsdsPacketHandler, ReceivesCcsdsTc}; -use satrs::ValidatorU16Id; - -#[derive(Clone)] -pub struct CcsdsReceiver< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone, - E, -> { - pub tc_source: TcSource, -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > ValidatorU16Id for CcsdsReceiver -{ - fn validate(&self, apid: u16) -> bool { - APID_VALIDATOR.contains(&apid) - } -} - -impl< - TcSource: ReceivesCcsdsTc + ReceivesEcssPusTc + Clone + 'static, - E: 'static, - > CcsdsPacketHandler for CcsdsReceiver -{ - type Error = E; - - fn handle_packet_with_valid_apid( - &mut self, - sp_header: &SpHeader, - tc_raw: &[u8], - ) -> Result<(), Self::Error> { - if sp_header.apid() == Apid::Cfdp as u16 { - } else { - return self.tc_source.pass_ccsds(sp_header, tc_raw); - } - Ok(()) - } - - fn handle_packet_with_unknown_apid( - &mut self, - sp_header: &SpHeader, - _tc_raw: &[u8], - ) -> Result<(), Self::Error> { - log::warn!("unknown APID 0x{:x?} detected", sp_header.apid()); - Ok(()) - } -} diff --git a/src/config.rs b/src/config.rs index cccd73c..4f0ad9e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,30 +1,26 @@ use lazy_static::lazy_static; use num_enum::{IntoPrimitive, TryFromPrimitive}; -use satrs::spacepackets::{PacketId, PacketType}; +use satrs::spacepackets::PacketId; use satrs_mib::res_code::ResultU16Info; use satrs_mib::resultcode; -use std::{collections::HashSet, net::Ipv4Addr}; -use strum::IntoEnumIterator; +use std::env; +use std::net::Ipv4Addr; +use std::path::{Path, PathBuf}; + +pub const STOP_FILE_NAME: &str = "stop-experiment"; +pub const HOME_FOLER_EXPERIMENT: &str = "/home/exp278"; pub const OBSW_SERVER_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const SERVER_PORT: u16 = 7301; +pub const TCP_SPP_SERVER_PORT: u16 = 4096; +pub const EXPERIMENT_ID: u32 = 278; +pub const EXPERIMENT_APID: u16 = 1024 + EXPERIMENT_ID as u16; +pub const EXPERIMENT_PACKET_ID: PacketId = PacketId::new_for_tc(true, EXPERIMENT_APID); +pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPERIMENT_APID)]; -lazy_static! { - pub static ref PACKET_ID_VALIDATOR: HashSet = { - let mut set = HashSet::new(); - for id in components::Apid::iter() { - set.insert(PacketId::new(PacketType::Tc, true, id as u16)); - } - set - }; - pub static ref APID_VALIDATOR: HashSet = { - let mut set = HashSet::new(); - for id in components::Apid::iter() { - set.insert(id as u16); - } - set - }; -} +// TODO: Would be nice if this can be commanded as well.. +/// Can be enabled to print all SPP packets received from the SPP server on port 4096. +pub const SPP_CLIENT_WIRETAPPING_RX: bool = false; #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[repr(u8)] @@ -38,6 +34,20 @@ pub enum GroupId { Tmtc = 0, Hk = 1, Mode = 2, + Action = 3, +} + +lazy_static! { + pub static ref HOME_PATH: PathBuf = { + let home_path_default = env::var("HOME").expect("HOME env variable not set"); + let mut home_path = PathBuf::new(); + home_path.push(if Path::new(HOME_FOLER_EXPERIMENT).exists() { + HOME_FOLER_EXPERIMENT + } else { + &home_path_default + }); + home_path + }; } pub mod tmtc_err { @@ -76,47 +86,56 @@ pub mod tmtc_err { ]; } +pub mod action_err { + use super::*; + use satrs::res_code::ResultU16; + + #[resultcode] + pub const INVALID_ACTION_ID: ResultU16 = ResultU16::new(GroupId::Action as u8, 0); + + pub const ACTION_RESULTS: &[ResultU16Info] = &[INVALID_ACTION_ID_EXT]; +} + pub mod components { use satrs::request::UniqueApidTargetId; - use strum::EnumIter; - #[derive(Copy, Clone, PartialEq, Eq, EnumIter)] - pub enum Apid { - Sched = 1, - GenericPus = 2, - Cfdp = 4, - } + use super::EXPERIMENT_APID; // Component IDs for components with the PUS APID. #[derive(Copy, Clone, PartialEq, Eq)] - pub enum PusId { - PusEventManagement = 0, - PusRouting = 1, - PusTest = 2, - PusAction = 3, - PusMode = 4, - PusHk = 5, - } - - #[derive(Copy, Clone, PartialEq, Eq)] - pub enum AcsId { - Mgm0 = 0, + pub enum UniqueId { + Controller = 0, + PusEventManagement = 1, + PusRouting = 2, + PusTest = 3, + PusAction = 4, + PusMode = 5, + PusHk = 6, + UdpServer = 7, + TcpServer = 8, + TcpSppClient = 9, } + pub const CONTROLLER_ID: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::Controller as u32); pub const PUS_ACTION_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusAction as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusAction as u32); pub const PUS_EVENT_MANAGEMENT: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, 0); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusEventManagement as u32); pub const PUS_ROUTING_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusRouting as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusRouting as u32); pub const PUS_TEST_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusTest as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusTest as u32); pub const PUS_MODE_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusMode as u32); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusMode as u32); pub const PUS_HK_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::GenericPus as u16, PusId::PusHk as u32); - pub const PUS_SCHED_SERVICE: UniqueApidTargetId = - UniqueApidTargetId::new(Apid::Sched as u16, 0); + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::PusHk as u32); + pub const UDP_SERVER: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::UdpServer as u32); + pub const TCP_SERVER: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpServer as u32); + pub const TCP_SPP_CLIENT: UniqueApidTargetId = + UniqueApidTargetId::new(EXPERIMENT_APID, UniqueId::TcpSppClient as u32); } pub mod tasks { @@ -124,4 +143,7 @@ pub mod tasks { pub const FREQ_MS_EVENT_HANDLING: u64 = 400; pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_PUS_STACK: u64 = 200; + pub const FREQ_MS_CTRL: u64 = 400; + + pub const STOP_CHECK_FREQUENCY: u64 = 400; } diff --git a/src/controller.rs b/src/controller.rs new file mode 100644 index 0000000..69c27c8 --- /dev/null +++ b/src/controller.rs @@ -0,0 +1,128 @@ +use num_enum::TryFromPrimitive; +use satrs::{ + action::ActionRequest, + pus::action::{ActionReplyPus, ActionReplyVariant}, + request::{GenericMessage, MessageMetadata}, +}; +use std::{ + env::temp_dir, + path::{Path, PathBuf}, + sync::{atomic::AtomicBool, mpsc, Arc}, +}; + +use ops_sat_rs::config::{action_err::INVALID_ACTION_ID, HOME_PATH, STOP_FILE_NAME}; + +use crate::requests::CompositeRequest; + +#[derive(Debug, Clone, Copy, TryFromPrimitive)] +#[repr(u32)] +pub enum ActionId { + StopExperiment = 1, +} + +pub struct ExperimentController { + pub composite_request_rx: mpsc::Receiver>, + pub action_reply_tx: mpsc::Sender>, + pub stop_signal: Arc, + home_path_stop_file: PathBuf, + tmp_path_stop_file: PathBuf, +} + +impl ExperimentController { + pub fn new( + composite_request_rx: mpsc::Receiver>, + action_reply_tx: mpsc::Sender>, + stop_signal: Arc, + ) -> Self { + let mut home_path_stop_file = PathBuf::new(); + home_path_stop_file.push(HOME_PATH.as_path()); + home_path_stop_file.push(STOP_FILE_NAME); + let mut tmp_path_stop_file = temp_dir(); + tmp_path_stop_file.push(STOP_FILE_NAME); + Self { + composite_request_rx, + action_reply_tx, + stop_signal, + home_path_stop_file, + tmp_path_stop_file, + } + } +} + +impl ExperimentController { + pub fn perform_operation(&mut self) { + match self.composite_request_rx.try_recv() { + Ok(msg) => match msg.message { + CompositeRequest::Hk(_) => { + log::warn!("hk request handling unimplemented") + } + CompositeRequest::Action(action_req) => { + self.handle_action_request(msg.requestor_info, action_req); + } + }, + Err(e) => { + if e != mpsc::TryRecvError::Empty { + log::error!("composite request rx error: {:?}", e); + } + } + } + self.check_stop_file(); + } + + pub fn handle_action_request(&mut self, requestor: MessageMetadata, action_req: ActionRequest) { + let action_id = ActionId::try_from(action_req.action_id); + if action_id.is_err() { + let result = self.action_reply_tx.send(GenericMessage::new_action_reply( + requestor, + action_req.action_id, + ActionReplyVariant::CompletionFailed { + error_code: INVALID_ACTION_ID, + params: None, + }, + )); + if result.is_err() { + log::error!("sending action reply failed"); + } + return; + } + let action_id = action_id.unwrap(); + match action_id { + ActionId::StopExperiment => { + self.stop_signal + .store(true, std::sync::atomic::Ordering::Relaxed); + let result = self.action_reply_tx.send(GenericMessage::new_action_reply( + requestor, + action_req.action_id, + ActionReplyVariant::Completed, + )); + if result.is_err() { + log::error!("sending action reply failed"); + } + } + } + } + + pub fn check_stop_file(&self) { + let check_at_path = |path: &Path| { + if path.exists() { + log::warn!( + "Detected stop file name at {:?}. Initiating experiment shutdown", + path + ); + // By default, clear the stop file. + let result = std::fs::remove_file(path); + if result.is_err() { + log::error!( + "failed to remove stop file at {:?}: {}", + path, + result.unwrap_err() + ); + } + self.stop_signal + .store(true, std::sync::atomic::Ordering::Relaxed); + } + }; + check_at_path(self.tmp_path_stop_file.as_path()); + check_at_path(self.home_path_stop_file.as_path()); + } +} diff --git a/src/interface/can.rs b/src/interface/can.rs new file mode 100644 index 0000000..af12524 --- /dev/null +++ b/src/interface/can.rs @@ -0,0 +1 @@ +//! This is a preliminary implementation of the necessary infrastructure to enable communication over OPS-SAT's internal CAN Bus. diff --git a/src/interface/mod.rs b/src/interface/mod.rs index f7a6a76..6b6038c 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -1,2 +1,36 @@ -pub mod tcp; -pub mod udp; +use derive_new::new; +use ops_sat_rs::config::SPP_CLIENT_WIRETAPPING_RX; +use satrs::{ + encoding::ccsds::{SpValidity, SpacePacketValidator}, + spacepackets::PacketId, +}; + +pub mod can; +pub mod tcp_server; +pub mod tcp_spp_client; +pub mod udp_server; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TcpComponent { + Server, + Client, +} + +#[derive(new, Clone)] +pub struct SimpleSpValidator { + component: TcpComponent, + valid_ids: Vec, +} + +impl SpacePacketValidator for SimpleSpValidator { + fn validate(&self, sp_header: &satrs::spacepackets::SpHeader, raw_buf: &[u8]) -> SpValidity { + if SPP_CLIENT_WIRETAPPING_RX && self.component == TcpComponent::Client { + log::debug!("sp header: {:?}", sp_header); + log::debug!("raw data: {:x?}", raw_buf); + } + if self.valid_ids.contains(&sp_header.packet_id) { + return SpValidity::Valid; + } + SpValidity::Skip + } +} diff --git a/src/interface/tcp.rs b/src/interface/tcp_server.rs similarity index 53% rename from src/interface/tcp.rs rename to src/interface/tcp_server.rs index 04bb136..c3275c6 100644 --- a/src/interface/tcp.rs +++ b/src/interface/tcp_server.rs @@ -1,17 +1,19 @@ use std::{ - collections::{HashSet, VecDeque}, - sync::{Arc, Mutex}, + collections::VecDeque, + sync::{atomic::AtomicBool, mpsc, Arc, Mutex}, + time::Duration, }; use log::{info, warn}; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; use satrs::{ - hal::std::tcp_server::{ServerConfig, TcpSpacepacketsServer}, - pus::ReceivesEcssPusTc, + hal::std::tcp_server::{HandledConnectionHandler, ServerConfig, TcpSpacepacketsServer}, + queue::GenericSendError, spacepackets::PacketId, - tmtc::{CcsdsDistributor, CcsdsError, ReceivesCcsdsTc, TmPacketSourceCore}, + tmtc::{PacketAsVec, PacketSource}, }; -use crate::ccsds::CcsdsReceiver; +use super::{SimpleSpValidator, TcpComponent}; #[derive(Default, Clone)] pub struct SyncTcpTmSource { @@ -41,7 +43,7 @@ impl SyncTcpTmSource { } } -impl TmPacketSourceCore for SyncTcpTmSource { +impl PacketSource for SyncTcpTmSource { type Error = (); fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result { @@ -69,58 +71,55 @@ impl TmPacketSourceCore for SyncTcpTmSource { } } -pub type TcpServerType = TcpSpacepacketsServer< - (), - CcsdsError, - SyncTcpTmSource, - CcsdsDistributor, MpscErrorType>, - HashSet, ->; +#[derive(Default)] +pub struct ConnectionFinishedHandler {} -pub struct TcpTask< - TcSource: ReceivesCcsdsTc - + ReceivesEcssPusTc - + Clone - + Send - + 'static, - MpscErrorType: 'static, -> { - server: TcpServerType, +impl HandledConnectionHandler for ConnectionFinishedHandler { + fn handled_connection(&mut self, info: satrs::hal::std::tcp_server::HandledConnectionInfo) { + info!( + "Served {} TMs and {} TCs for client {:?}", + info.num_sent_tms, info.num_received_tcs, info.addr + ); + } } -impl< - TcSource: ReceivesCcsdsTc - + ReceivesEcssPusTc - + Clone - + Send - + 'static, - MpscErrorType: 'static + core::fmt::Debug, - > TcpTask -{ +pub type TcpServer = TcpSpacepacketsServer< + SyncTcpTmSource, + mpsc::Sender, + SimpleSpValidator, + ConnectionFinishedHandler, + (), + GenericSendError, +>; + +pub struct TcpTask(pub TcpServer); + +impl TcpTask { pub fn new( cfg: ServerConfig, tm_source: SyncTcpTmSource, - tc_receiver: CcsdsDistributor, MpscErrorType>, - packet_id_lookup: HashSet, + tc_sender: mpsc::Sender, + valid_ids: Vec, + stop_signal: Arc, ) -> Result { - Ok(Self { - server: TcpSpacepacketsServer::new(cfg, tm_source, tc_receiver, packet_id_lookup)?, - }) + Ok(Self(TcpSpacepacketsServer::new( + cfg, + tm_source, + tc_sender, + SimpleSpValidator::new(TcpComponent::Server, valid_ids), + ConnectionFinishedHandler::default(), + Some(stop_signal), + )?)) } pub fn periodic_operation(&mut self) { - loop { - let result = self.server.handle_next_connection(); - match result { - Ok(conn_result) => { - info!( - "Served {} TMs and {} TCs for client {:?}", - conn_result.num_sent_tms, conn_result.num_received_tcs, conn_result.addr - ); - } - Err(e) => { - warn!("TCP server error: {e:?}"); - } + let result = self + .0 + .handle_all_connections(Some(Duration::from_millis(STOP_CHECK_FREQUENCY))); + match result { + Ok(_conn_result) => (), + Err(e) => { + warn!("TCP server error: {e:?}"); } } } diff --git a/src/interface/tcp_spp_client.rs b/src/interface/tcp_spp_client.rs new file mode 100644 index 0000000..847f000 --- /dev/null +++ b/src/interface/tcp_spp_client.rs @@ -0,0 +1,191 @@ +use std::io::{self, Read, Write}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::mpsc; +use std::time::Duration; + +use mio::net::TcpStream; +use mio::{Events, Interest, Poll, Token}; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; +use ops_sat_rs::config::{SPP_CLIENT_WIRETAPPING_RX, TCP_SPP_SERVER_PORT}; +use satrs::encoding::ccsds::parse_buffer_for_ccsds_space_packets; +use satrs::queue::GenericSendError; +use satrs::spacepackets::PacketId; +use satrs::tmtc::PacketAsVec; +use satrs::ComponentId; +use thiserror::Error; + +use super::{SimpleSpValidator, TcpComponent}; + +#[derive(Debug, Error)] +pub enum PacketForwardingError { + #[error("send error: {0}")] + Send(#[from] GenericSendError), + #[error("io error: {0}")] + Io(#[from] io::Error), +} + +pub struct TcpSppClient { + id: ComponentId, + poll: Poll, + events: Events, + // Optional to allow periodic reconnection attempts on the TCP server. + client: Option, + read_buf: [u8; 4096], + tm_tcp_client_rx: mpsc::Receiver, + server_addr: SocketAddr, + tc_source_tx: mpsc::Sender, + validator: SimpleSpValidator, +} + +impl TcpSppClient { + pub fn new( + id: ComponentId, + tc_source_tx: mpsc::Sender, + tm_tcp_client_rx: mpsc::Receiver, + valid_ids: &'static [PacketId], + ) -> io::Result { + let mut poll = Poll::new()?; + let events = Events::with_capacity(128); + let server_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 1)), TCP_SPP_SERVER_PORT); + let client = Self::attempt_connection(&mut poll, &server_addr); + if client.is_err() { + log::warn!( + "connection to TCP server {} failed: {}", + server_addr, + client.unwrap_err() + ); + return Ok(Self { + id, + poll, + events, + client: None, + read_buf: [0; 4096], + server_addr, + tm_tcp_client_rx, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }); + } + Ok(Self { + id, + poll, + events, + client: Some(client.unwrap()), + read_buf: [0; 4096], + server_addr, + tm_tcp_client_rx, + tc_source_tx, + validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()), + }) + } + + pub fn attempt_connection(poll: &mut Poll, server_addr: &SocketAddr) -> io::Result { + let mut client = TcpStream::connect(*server_addr)?; + poll.registry().register( + &mut client, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )?; + Ok(client) + } + + pub fn periodic_operation(&mut self) -> Result<(), PacketForwardingError> { + if self.client.is_some() { + return self.perform_regular_operation(); + } else { + let client_result = Self::attempt_connection(&mut self.poll, &self.server_addr); + match client_result { + Ok(client) => { + self.client = Some(client); + self.perform_regular_operation()?; + } + Err(ref e) => { + log::warn!( + "connection to TCP server {} failed: {}", + self.server_addr, + e + ); + } + } + } + Ok(()) + } + + pub fn perform_regular_operation(&mut self) -> Result<(), PacketForwardingError> { + self.poll.poll( + &mut self.events, + Some(Duration::from_millis(STOP_CHECK_FREQUENCY)), + )?; + let events: Vec = self.events.iter().cloned().collect(); + for event in events { + if event.token() == Token(0) { + if event.is_readable() { + self.read_from_server()?; + } + if event.is_writable() { + self.write_to_server()?; + } + } + } + Ok(()) + } + + pub fn read_from_server(&mut self) -> Result<(), PacketForwardingError> { + let client = self + .client + .as_mut() + .expect("TCP stream invalid when it should not be"); + match client.read(&mut self.read_buf) { + Ok(0) => return Err(io::Error::from(io::ErrorKind::BrokenPipe).into()), + Ok(read_bytes) => self.handle_read_bytstream(read_bytes)?, + Err(e) => return Err(e.into()), + } + Ok(()) + } + + pub fn write_to_server(&mut self) -> io::Result<()> { + let client = self + .client + .as_mut() + .expect("TCP stream invalid when it should not be"); + loop { + match self.tm_tcp_client_rx.try_recv() { + Ok(tm) => { + client.write_all(&tm.packet)?; + } + Err(e) => match e { + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + log::error!("TM sender to TCP client has disconnected"); + break; + } + }, + } + } + Ok(()) + } + + pub fn handle_read_bytstream( + &mut self, + read_bytes: usize, + ) -> Result<(), PacketForwardingError> { + let mut dummy = 0; + if SPP_CLIENT_WIRETAPPING_RX { + log::debug!( + "received {} bytes on TCP client: {:x?}", + read_bytes, + &self.read_buf[..read_bytes] + ); + } + // This parser is able to deal with broken tail packets, but we ignore those for now.. + parse_buffer_for_ccsds_space_packets( + &mut self.read_buf[..read_bytes], + &self.validator, + self.id, + &self.tc_source_tx, + &mut dummy, + )?; + Ok(()) + } +} diff --git a/src/interface/udp.rs b/src/interface/udp_server.rs similarity index 62% rename from src/interface/udp.rs rename to src/interface/udp_server.rs index 5c45e9e..9a6cfb8 100644 --- a/src/interface/udp.rs +++ b/src/interface/udp_server.rs @@ -2,18 +2,18 @@ use std::net::{SocketAddr, UdpSocket}; use std::sync::mpsc; use log::{info, warn}; -use satrs::pus::PusTmAsVec; -use satrs::{ - hal::std::udp_server::{ReceiveResult, UdpTcServer}, - tmtc::CcsdsError, -}; +use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer}; +use satrs::queue::GenericSendError; +use satrs::tmtc::PacketAsVec; + +use crate::pus::HandlingStatus; pub trait UdpTmHandler { fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr); } pub struct DynamicUdpTmHandler { - pub tm_rx: mpsc::Receiver, + pub tm_rx: mpsc::Receiver, } impl UdpTmHandler for DynamicUdpTmHandler { @@ -34,42 +34,39 @@ impl UdpTmHandler for DynamicUdpTmHandler { } } -pub struct UdpTmtcServer { - pub udp_tc_server: UdpTcServer>, +pub struct UdpTmtcServer { + pub udp_tc_server: UdpTcServer, GenericSendError>, pub tm_handler: TmHandler, } -impl - UdpTmtcServer -{ +impl UdpTmtcServer { pub fn periodic_operation(&mut self) { - while self.poll_tc_server() {} + loop { + if self.poll_tc_server() == HandlingStatus::Empty { + break; + } + } if let Some(recv_addr) = self.udp_tc_server.last_sender() { self.tm_handler .send_tm_to_udp_client(&self.udp_tc_server.socket, &recv_addr); } } - fn poll_tc_server(&mut self) -> bool { + fn poll_tc_server(&mut self) -> HandlingStatus { match self.udp_tc_server.try_recv_tc() { - Ok(_) => true, - Err(e) => match e { - ReceiveResult::ReceiverError(e) => match e { - CcsdsError::ByteConversionError(e) => { - warn!("packet error: {e:?}"); - true + Ok(_) => HandlingStatus::HandledOne, + Err(e) => { + match e { + ReceiveResult::NothingReceived => (), + ReceiveResult::Io(io_error) => { + warn!("Error receiving TC from UDP server: {io_error}"); } - CcsdsError::CustomError(e) => { - warn!("mpsc custom error {e:?}"); - true + ReceiveResult::Send(send_error) => { + warn!("error sending TM to UDP client: {send_error}"); } - }, - ReceiveResult::IoError(e) => { - warn!("IO error {e}"); - false } - ReceiveResult::NothingReceived => false, - }, + HandlingStatus::Empty + } } } } @@ -79,29 +76,35 @@ mod tests { use std::{ collections::VecDeque, net::IpAddr, - sync::{Arc, Mutex}, + sync::{mpsc::TryRecvError, Arc, Mutex}, }; + use ops_sat_rs::config::{EXPERIMENT_APID, OBSW_SERVER_ADDR}; use satrs::{ spacepackets::{ ecss::{tc::PusTcCreator, WritablePusPacket}, SpHeader, }, - tmtc::ReceivesTcCore, + tmtc::PacketSenderRaw, + ComponentId, }; - use ops_sat_rs::config::{components, OBSW_SERVER_ADDR}; use super::*; + const UDP_SERVER_ID: ComponentId = 0x05; + #[derive(Default, Debug, Clone)] pub struct TestReceiver { - tc_vec: Arc>>>, + tc_vec: Arc>>, } - impl ReceivesTcCore for TestReceiver { - type Error = CcsdsError<()>; - fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.tc_vec.lock().unwrap().push_back(tc_raw.to_vec()); + impl PacketSenderRaw for TestReceiver { + type Error = (); + fn send_packet(&self, sender_id: ComponentId, packet: &[u8]) -> Result<(), Self::Error> { + self.tc_vec + .lock() + .unwrap() + .push_back(PacketAsVec::new(sender_id, packet.to_vec())); Ok(()) } } @@ -120,9 +123,8 @@ mod tests { #[test] fn test_basic() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); - let test_receiver = TestReceiver::default(); - let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let (tx, rx) = mpsc::channel(); + let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); let mut udp_dyn_server = UdpTmtcServer { @@ -130,16 +132,14 @@ mod tests { tm_handler, }; udp_dyn_server.periodic_operation(); - assert!(tc_queue.lock().unwrap().is_empty()); - assert!(tm_handler_calls.lock().unwrap().is_empty()); + matches!(rx.try_recv(), Err(TryRecvError::Empty)); } #[test] fn test_transactions() { let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), 0); - let test_receiver = TestReceiver::default(); - let tc_queue = test_receiver.tc_vec.clone(); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(test_receiver)).unwrap(); + let (tx, rx) = mpsc::channel(); + let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap(); let server_addr = udp_tc_server.socket.local_addr().unwrap(); let tm_handler = TestTmHandler::default(); let tm_handler_calls = tm_handler.addrs_to_send_to.clone(); @@ -147,7 +147,7 @@ mod tests { udp_tc_server, tm_handler, }; - let sph = SpHeader::new_for_unseg_tc(components::Apid::GenericPus as u16, 0, 0); + let sph = SpHeader::new_for_unseg_tc(EXPERIMENT_APID, 0, 0); let ping_tc = PusTcCreator::new_simple(sph, 17, 1, &[], true) .to_vec() .unwrap(); @@ -157,10 +157,9 @@ mod tests { client.send(&ping_tc).unwrap(); udp_dyn_server.periodic_operation(); { - let mut tc_queue = tc_queue.lock().unwrap(); - assert!(!tc_queue.is_empty()); - let received_tc = tc_queue.pop_front().unwrap(); - assert_eq!(received_tc, ping_tc); + let packet_with_sender = rx.try_recv().unwrap(); + assert_eq!(packet_with_sender.packet, ping_tc); + matches!(rx.try_recv(), Err(TryRecvError::Empty)); } { @@ -171,7 +170,7 @@ mod tests { assert_eq!(received_addr, client_addr); } udp_dyn_server.periodic_operation(); - assert!(tc_queue.lock().unwrap().is_empty()); + matches!(rx.try_recv(), Err(TryRecvError::Empty)); // Still tries to send to the same client. { let mut tm_handler_calls = tm_handler_calls.lock().unwrap(); diff --git a/src/main.rs b/src/main.rs index 2db5ca5..afc0843 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,73 +1,83 @@ use std::{ net::{IpAddr, SocketAddr}, - sync::mpsc, + sync::{atomic::AtomicBool, mpsc, Arc}, thread, time::Duration, }; use log::info; -use ops_sat_rs::config::tasks::FREQ_MS_PUS_STACK; use ops_sat_rs::config::{ - tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, PACKET_ID_VALIDATOR, SERVER_PORT, -}; -use satrs::{ - hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}, - tmtc::CcsdsDistributor, + components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, + tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK}, + VALID_PACKET_ID_LIST, }; +use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; +use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; -use crate::pus::stack::PusStack; -use crate::pus::test::create_test_service_dynamic; -use crate::pus::{PusReceiver, PusTcMpscRouter}; -use crate::tm_funnel::TmFunnelDynamic; -use crate::tmtc::TcSourceTaskDynamic; +use crate::tmtc::tc_source::TcSourceTaskDynamic; +use crate::tmtc::tm_sink::TmFunnelDynamic; +use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{ - ccsds::CcsdsReceiver, - interface::tcp::{SyncTcpTmSource, TcpTask}, - interface::udp::{DynamicUdpTmHandler, UdpTmtcServer}, + interface::tcp_server::{SyncTcpTmSource, TcpTask}, + interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, logger::setup_logger, - tmtc::PusTcSourceProviderDynamic, +}; +use crate::{ + interface::tcp_spp_client::TcpSppClient, + pus::{PusTcDistributor, PusTcMpscRouter}, +}; +use crate::{ + pus::{action::create_action_service, stack::PusStack}, + requests::GenericRequestRouter, }; -mod ccsds; +mod controller; mod interface; mod logger; mod pus; mod requests; -mod tm_funnel; mod tmtc; -#[allow(dead_code)] fn main() { setup_logger().expect("setting up logging with fern failed"); - println!("OPS-SAT Rust experiment OBSW"); + println!("OPS-SAT Rust Experiment OBSW"); + + let stop_signal = Arc::new(AtomicBool::new(false)); let (tc_source_tx, tc_source_rx) = mpsc::channel(); let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); - let (tm_server_tx, tm_server_rx) = mpsc::channel(); - - let tc_source = PusTcSourceProviderDynamic(tc_source_tx); + let (tm_tcp_server_tx, tm_tcp_server_rx) = mpsc::channel(); + let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel(); // let (pus_event_tx, pus_event_rx) = mpsc::channel(); // let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); // let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); - // let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); // let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); - // let (_pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); + let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); // let (pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); // let (pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); + let (controller_composite_tx, controller_composite_rx) = mpsc::channel(); + // let (controller_action_reply_tx, controller_action_reply_rx) = mpsc::channel(); + + // Some request are targetable. This map is used to retrieve sender handles based on a target ID. + let mut request_map = GenericRequestRouter::default(); + request_map + .composite_router_map + .insert(CONTROLLER_ID.id(), controller_composite_tx); let pus_router = PusTcMpscRouter { test_tc_sender: pus_test_tx, // event_tc_sender: pus_event_tx, // sched_tc_sender: pus_sched_tx, // hk_tc_sender: pus_hk_tx, - // action_tc_sender: pus_action_tx, + action_tc_sender: pus_action_tx, // mode_tc_sender: pus_mode_tx, }; - let pus_test_service = create_test_service_dynamic( + let pus_test_service = create_test_service( tm_funnel_tx.clone(), // event_handler.clone_event_sender(), pus_test_rx, @@ -81,12 +91,12 @@ fn main() { // // let pus_event_service = // create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx); - // let pus_action_service = create_action_service_dynamic( - // tm_funnel_tx.clone(), - // pus_action_rx, - // request_map.clone(), - // pus_action_reply_rx, - // ); + let pus_action_service = create_action_service( + tm_funnel_tx.clone(), + pus_action_rx, + request_map.clone(), + pus_action_reply_rx, + ); // let pus_hk_service = create_hk_service_dynamic( // tm_funnel_tx.clone(), // pus_hk_rx, @@ -103,89 +113,165 @@ fn main() { pus_test_service, // pus_hk_service, // pus_event_service, - // pus_action_service, + pus_action_service, // pus_scheduler_service, // pus_mode_service, ); - let ccsds_receiver = CcsdsReceiver { tc_source }; - let mut tmtc_task = TcSourceTaskDynamic::new( tc_source_rx, - PusReceiver::new(tm_funnel_tx.clone(), pus_router), + PusTcDistributor::new(tm_funnel_tx.clone(), pus_router), ); let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let udp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver.clone()); - let udp_tc_server = UdpTcServer::new(sock_addr, 2048, Box::new(udp_ccsds_distributor)) + let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone()) .expect("creating UDP TMTC server failed"); let mut udp_tmtc_server = UdpTmtcServer { udp_tc_server, tm_handler: DynamicUdpTmHandler { - tm_rx: tm_server_rx, + tm_rx: tm_tcp_server_rx, }, }; - let tcp_ccsds_distributor = CcsdsDistributor::new(ccsds_receiver); - let tcp_server_cfg = ServerConfig::new(sock_addr, Duration::from_millis(400), 4096, 8192); + let tcp_server_cfg = ServerConfig::new( + TCP_SERVER.id(), + sock_addr, + Duration::from_millis(400), + 4096, + 8192, + ); let sync_tm_tcp_source = SyncTcpTmSource::new(200); let mut tcp_server = TcpTask::new( tcp_server_cfg, sync_tm_tcp_source.clone(), - tcp_ccsds_distributor, - PACKET_ID_VALIDATOR.clone(), + tc_source_tx.clone(), + VALID_PACKET_ID_LIST.to_vec(), + stop_signal.clone(), ) .expect("tcp server creation failed"); - let mut tm_funnel = TmFunnelDynamic::new(sync_tm_tcp_source, tm_funnel_rx, tm_server_tx); + let mut tm_funnel = TmFunnelDynamic::new( + sync_tm_tcp_source, + tm_funnel_rx, + tm_tcp_server_tx, + tm_tcp_client_tx, + stop_signal.clone(), + ); + + let mut controller = ExperimentController::new( + controller_composite_rx, + pus_action_reply_tx, + stop_signal.clone(), + ); + + let mut tcp_spp_client = TcpSppClient::new( + TCP_SPP_CLIENT.id(), + tc_source_tx, + tm_tcp_client_rx, + VALID_PACKET_ID_LIST, + ) + .expect("creating TCP SPP client failed"); + + info!("Starting CTRL task"); + let ctrl_stop_signal = stop_signal.clone(); + let jh_ctrl_thread = thread::Builder::new() + .name("ops-sat ctrl".to_string()) + .spawn(move || loop { + controller.perform_operation(); + if ctrl_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + thread::sleep(Duration::from_millis(FREQ_MS_CTRL)); + }) + .unwrap(); info!("Starting TMTC and UDP task"); + let tmtc_stop_signal = stop_signal.clone(); let jh_udp_tmtc = thread::Builder::new() - .name("TMTC and UDP".to_string()) + .name("ops-sat tmtc-udp".to_string()) .spawn(move || { info!("Running UDP server on port {SERVER_PORT}"); loop { udp_tmtc_server.periodic_operation(); tmtc_task.periodic_operation(); + if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } thread::sleep(Duration::from_millis(FREQ_MS_UDP_TMTC)); } }) .unwrap(); - info!("Starting TCP task"); - let jh_tcp = thread::Builder::new() - .name("TCP".to_string()) + let tcp_server_stop_signal = stop_signal.clone(); + info!("Starting TCP server task"); + let jh_tcp_server = thread::Builder::new() + .name("ops-sat tcp-server".to_string()) .spawn(move || { info!("Running TCP server on port {SERVER_PORT}"); loop { tcp_server.periodic_operation(); + if tcp_server_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + }) + .unwrap(); + + // We could also move this to the existing TCP server thread, but we would have to adapt + // the server code for this so we do not block anymore and we pause manually if both the client + // and server are IDLE and have nothing to do.. + let tcp_client_stop_signal = stop_signal.clone(); + info!("Starting TCP SPP client task"); + let jh_tcp_client = thread::Builder::new() + .name("ops-sat tcp-client".to_string()) + .spawn(move || { + info!("Running TCP SPP client"); + loop { + let _result = tcp_spp_client.periodic_operation(); + if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } } }) .unwrap(); info!("Starting TM funnel task"); + let funnel_stop_signal = stop_signal.clone(); let jh_tm_funnel = thread::Builder::new() - .name("TM Funnel".to_string()) + .name("ops-sat tm-funnel".to_string()) .spawn(move || loop { tm_funnel.operation(); + if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } }) .unwrap(); info!("Starting PUS handler thread"); + let pus_stop_signal = stop_signal.clone(); let jh_pus_handler = thread::Builder::new() - .name("PUS".to_string()) + .name("ops-sat pus".to_string()) .spawn(move || loop { pus_stack.periodic_operation(); + if pus_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } thread::sleep(Duration::from_millis(FREQ_MS_PUS_STACK)); }) .unwrap(); + jh_ctrl_thread + .join() + .expect("Joining Controller thread failed"); jh_udp_tmtc .join() .expect("Joining UDP TMTC server thread failed"); - jh_tcp + jh_tcp_server .join() .expect("Joining TCP TMTC server thread failed"); + jh_tcp_client + .join() + .expect("Joining TCP TMTC client thread failed"); jh_tm_funnel .join() .expect("Joining TM Funnel thread failed"); diff --git a/src/pus/action.rs b/src/pus/action.rs new file mode 100644 index 0000000..cd2472f --- /dev/null +++ b/src/pus/action.rs @@ -0,0 +1,716 @@ +use log::{error, warn}; +use ops_sat_rs::config::components::PUS_ACTION_SERVICE; +use ops_sat_rs::config::tmtc_err; +use satrs::action::{ActionRequest, ActionRequestVariant}; +use satrs::params::WritableToBeBytes; +use satrs::pus::action::{ + ActionReplyPus, ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap, +}; +use satrs::pus::verification::{ + FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter, + VerificationReportingProvider, VerificationToken, +}; +use satrs::pus::{ + ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError, + GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, + PusTcToRequestConverter, +}; +use satrs::request::{GenericMessage, UniqueApidTargetId}; +use satrs::spacepackets::ecss::tc::PusTcReader; +use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket}; +use satrs::tmtc::PacketAsVec; +use std::sync::mpsc; +use std::time::Duration; + +use crate::requests::GenericRequestRouter; + +use super::{ + create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus, + PusTargetedRequestService, TargetedPusService, +}; + +pub struct ActionReplyHandler { + fail_data_buf: [u8; 128], +} + +impl Default for ActionReplyHandler { + fn default() -> Self { + Self { + fail_data_buf: [0; 128], + } + } +} + +impl PusReplyHandler for ActionReplyHandler { + type Error = EcssTmtcError; + + fn handle_unrequested_reply( + &mut self, + reply: &GenericMessage, + _tm_sender: &impl EcssTmSender, + ) -> Result<(), Self::Error> { + warn!("received unexpected reply for service 8: {reply:?}"); + Ok(()) + } + + fn handle_reply( + &mut self, + reply: &GenericMessage, + active_request: &ActivePusActionRequestStd, + tm_sender: &(impl EcssTmSender + ?Sized), + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result { + let verif_token: VerificationToken = active_request + .token() + .try_into() + .expect("invalid token state"); + let remove_entry = match &reply.message.variant { + ActionReplyVariant::CompletionFailed { error_code, params } => { + let mut fail_data_len = 0; + if let Some(params) = params { + fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; + } + verification_handler.completion_failure( + tm_sender, + verif_token, + FailParams::new(time_stamp, error_code, &self.fail_data_buf[..fail_data_len]), + )?; + true + } + ActionReplyVariant::StepFailed { + error_code, + step, + params, + } => { + let mut fail_data_len = 0; + if let Some(params) = params { + fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?; + } + verification_handler.step_failure( + tm_sender, + verif_token, + FailParamsWithStep::new( + time_stamp, + &EcssEnumU16::new(*step), + error_code, + &self.fail_data_buf[..fail_data_len], + ), + )?; + true + } + ActionReplyVariant::Completed => { + verification_handler.completion_success(tm_sender, verif_token, time_stamp)?; + true + } + ActionReplyVariant::StepSuccess { step } => { + verification_handler.step_success( + tm_sender, + &verif_token, + time_stamp, + EcssEnumU16::new(*step), + )?; + false + } + _ => false, + }; + Ok(remove_entry) + } + + fn handle_request_timeout( + &mut self, + active_request: &ActivePusActionRequestStd, + tm_sender: &impl EcssTmSender, + verification_handler: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(), Self::Error> { + generic_pus_request_timeout_handler( + tm_sender, + active_request, + verification_handler, + time_stamp, + "action", + ) + } +} + +#[derive(Default)] +pub struct ActionRequestConverter {} + +impl PusTcToRequestConverter for ActionRequestConverter { + type Error = GenericConversionError; + + fn convert( + &mut self, + token: VerificationToken, + tc: &PusTcReader, + tm_sender: &(impl EcssTmSender + ?Sized), + verif_reporter: &impl VerificationReportingProvider, + time_stamp: &[u8], + ) -> Result<(ActivePusActionRequestStd, ActionRequest), Self::Error> { + let subservice = tc.subservice(); + let user_data = tc.user_data(); + if user_data.len() < 8 { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::NOT_ENOUGH_APP_DATA), + ) + .expect("Sending start failure failed"); + return Err(GenericConversionError::NotEnoughAppData { + expected: 8, + found: user_data.len(), + }); + } + let target_id_and_apid = UniqueApidTargetId::from_pus_tc(tc).unwrap(); + let action_id = u32::from_be_bytes(user_data[4..8].try_into().unwrap()); + if subservice == 128 { + let req_variant = if user_data.len() == 8 { + ActionRequestVariant::NoData + } else { + ActionRequestVariant::VecData(user_data[8..].to_vec()) + }; + Ok(( + ActivePusActionRequestStd::new( + action_id, + target_id_and_apid.into(), + token.into(), + Duration::from_secs(30), + ), + ActionRequest::new(action_id, req_variant), + )) + } else { + verif_reporter + .start_failure( + tm_sender, + token, + FailParams::new_no_fail_data(time_stamp, &tmtc_err::INVALID_PUS_SUBSERVICE), + ) + .expect("Sending start failure failed"); + Err(GenericConversionError::InvalidSubservice(subservice)) + } + } +} + +pub fn create_action_service( + tm_funnel_tx: mpsc::Sender, + pus_action_rx: mpsc::Receiver, + action_router: GenericRequestRouter, + reply_receiver: mpsc::Receiver>, +) -> ActionServiceWrapper { + let action_request_handler = PusTargetedRequestService::new( + PusServiceHelper::new( + PUS_ACTION_SERVICE.id(), + pus_action_rx, + tm_funnel_tx, + create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid), + EcssTcInVecConverter::default(), + ), + ActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + action_router, + reply_receiver, + ); + ActionServiceWrapper { + service: action_request_handler, + } +} + +pub struct ActionServiceWrapper { + pub(crate) service: PusTargetedRequestService< + VerificationReporter, + ActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + ActionReplyPus, + >, +} + +impl TargetedPusService for ActionServiceWrapper { + /// Returns [true] if the packet handling is finished. + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus { + match self.service.poll_and_handle_next_tc(time_stamp) { + Ok(result) => match result { + PusPacketHandlerResult::RequestHandled => {} + PusPacketHandlerResult::RequestHandledPartialSuccess(e) => { + warn!("PUS 8 partial packet handling success: {e:?}") + } + PusPacketHandlerResult::CustomSubservice(invalid, _) => { + warn!("PUS 8 invalid subservice {invalid}"); + } + PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => { + warn!("PUS 8 subservice {subservice} not implemented"); + } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, + }, + Err(error) => { + error!("PUS packet handling error: {error:?}"); + return HandlingStatus::Empty; + } + } + HandlingStatus::HandledOne + } + + fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus { + // This only fails if all senders disconnected. Treat it like an empty queue. + self.service + .poll_and_check_next_reply(time_stamp) + .unwrap_or_else(|e| { + warn!("PUS 8: Handling reply failed with error {e:?}"); + HandlingStatus::Empty + }) + } + + fn check_for_request_timeouts(&mut self) { + self.service.check_for_request_timeouts(); + } +} + +#[cfg(test)] +mod tests { + use satrs::pus::test_util::{ + TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1, + }; + use satrs::pus::verification; + use satrs::pus::verification::test_util::TestVerificationReporter; + use satrs::request::MessageMetadata; + use satrs::ComponentId; + use satrs::{ + res_code::ResultU16, + spacepackets::{ + ecss::{ + tc::{PusTcCreator, PusTcSecondaryHeader}, + tm::PusTmReader, + WritablePusPacket, + }, + SpHeader, + }, + }; + + use crate::{ + pus::tests::{PusConverterTestbench, ReplyHandlerTestbench, TargetedPusRequestTestbench}, + requests::CompositeRequest, + }; + + use super::*; + + impl + TargetedPusRequestTestbench< + ActionRequestConverter, + ActionReplyHandler, + DefaultActiveActionRequestMap, + ActivePusActionRequestStd, + ActionRequest, + ActionReplyPus, + > + { + pub fn new_for_action(owner_id: ComponentId, target_id: ComponentId) -> Self { + let _ = env_logger::builder().is_test(true).try_init(); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); + let (pus_action_tx, pus_action_rx) = mpsc::channel(); + let (action_reply_tx, action_reply_rx) = mpsc::channel(); + let (action_req_tx, action_req_rx) = mpsc::channel(); + let verif_reporter = TestVerificationReporter::new(owner_id); + let mut generic_req_router = GenericRequestRouter::default(); + generic_req_router + .composite_router_map + .insert(target_id, action_req_tx); + Self { + service: PusTargetedRequestService::new( + PusServiceHelper::new( + owner_id, + pus_action_rx, + tm_funnel_tx.clone(), + verif_reporter, + EcssTcInVecConverter::default(), + ), + ActionRequestConverter::default(), + DefaultActiveActionRequestMap::default(), + ActionReplyHandler::default(), + generic_req_router, + action_reply_rx, + ), + request_id: None, + pus_packet_tx: pus_action_tx, + tm_funnel_rx, + reply_tx: action_reply_tx, + request_rx: action_req_rx, + } + } + + pub fn verify_packet_started(&self) { + self.service + .service_helper + .common + .verif_reporter + .check_next_is_started_success( + self.service.service_helper.id(), + self.request_id.expect("request ID not set").into(), + ); + } + + pub fn verify_packet_completed(&self) { + self.service + .service_helper + .common + .verif_reporter + .check_next_is_completion_success( + self.service.service_helper.id(), + self.request_id.expect("request ID not set").into(), + ); + } + + pub fn verify_tm_empty(&self) { + let packet = self.tm_funnel_rx.try_recv(); + if let Err(mpsc::TryRecvError::Empty) = packet { + } else { + let tm = packet.unwrap(); + let unexpected_tm = PusTmReader::new(&tm.packet, 7).unwrap().0; + panic!("unexpected TM packet {unexpected_tm:?}"); + } + } + + pub fn verify_next_tc_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + panic!("unexpected error {:?}", e); + } + let result = result.unwrap(); + match result { + PusPacketHandlerResult::RequestHandled => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_all_tcs_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_handle_next_tc(time_stamp); + if let Err(e) = result { + panic!("unexpected error {:?}", e); + } + let result = result.unwrap(); + match result { + PusPacketHandlerResult::Empty => (), + _ => panic!("unexpected result {result:?}"), + } + } + + pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), HandlingStatus::HandledOne); + } + + pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) { + let result = self.service.poll_and_check_next_reply(time_stamp); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), HandlingStatus::Empty); + } + + pub fn add_tc(&mut self, tc: &PusTcCreator) { + self.request_id = Some(verification::RequestId::new(tc).into()); + let token = self.service.service_helper.verif_reporter_mut().add_tc(tc); + let accepted_token = self + .service + .service_helper + .verif_reporter() + .acceptance_success(self.service.service_helper.tm_sender(), token, &[0; 7]) + .expect("TC acceptance failed"); + self.service + .service_helper + .verif_reporter() + .check_next_was_added(accepted_token.request_id()); + let id = self.service.service_helper.id(); + self.service + .service_helper + .verif_reporter() + .check_next_is_acceptance_success(id, accepted_token.request_id()); + self.pus_packet_tx + .send(EcssTcAndToken::new(tc.to_vec().unwrap(), accepted_token)) + .unwrap(); + } + } + + #[test] + fn basic_request() { + let mut testbench = TargetedPusRequestTestbench::new_for_action( + TEST_COMPONENT_ID_0.id(), + TEST_COMPONENT_ID_1.id(), + ); + // Create a basic action request and verify forwarding. + let sp_header = SpHeader::new_from_apid(TEST_APID); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_1.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new(sp_header, sec_header, &app_data, true); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + testbench.verify_next_tc_is_handled_properly(&time_stamp); + testbench.verify_all_tcs_handled(&time_stamp); + + testbench.verify_packet_started(); + + let possible_req = testbench.request_rx.try_recv(); + assert!(possible_req.is_ok()); + let req = possible_req.unwrap(); + if let CompositeRequest::Action(action_req) = req.message { + assert_eq!(action_req.action_id, action_id); + assert_eq!(action_req.variant, ActionRequestVariant::NoData); + let action_reply = ActionReplyPus::new(action_id, ActionReplyVariant::Completed); + testbench + .reply_tx + .send(GenericMessage::new(req.requestor_info, action_reply)) + .unwrap(); + } else { + panic!("unexpected request type"); + } + testbench.verify_next_reply_is_handled_properly(&time_stamp); + testbench.verify_all_replies_handled(&time_stamp); + + testbench.verify_packet_completed(); + testbench.verify_tm_empty(); + } + + #[test] + fn basic_request_routing_error() { + let mut testbench = TargetedPusRequestTestbench::new_for_action( + TEST_COMPONENT_ID_0.id(), + TEST_COMPONENT_ID_1.id(), + ); + // Create a basic action request and verify forwarding. + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&0_u32.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + testbench.add_tc(&pus8_packet); + let time_stamp: [u8; 7] = [0; 7]; + + let result = testbench.service.poll_and_handle_next_tc(&time_stamp); + assert!(result.is_err()); + // Verify the correct result and completion failure. + } + + #[test] + fn converter_action_req_no_data() { + let mut testbench = PusConverterTestbench::new( + TEST_COMPONENT_ID_0.raw(), + ActionRequestConverter::default(), + ); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 8] = [0; 8]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + let token = testbench.add_tc(&pus8_packet); + let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0); + assert!(result.is_ok()); + let (active_req, request) = result.unwrap(); + if let ActionRequestVariant::NoData = request.variant { + assert_eq!(request.action_id, action_id); + assert_eq!(active_req.action_id, action_id); + assert_eq!( + active_req.target_id(), + UniqueApidTargetId::new(TEST_APID, TEST_UNIQUE_ID_0).raw() + ); + assert_eq!( + active_req.token().request_id(), + testbench.request_id().unwrap() + ); + } else { + panic!("unexpected action request variant"); + } + } + + #[test] + fn converter_action_req_with_data() { + let mut testbench = + PusConverterTestbench::new(TEST_COMPONENT_ID_0.id(), ActionRequestConverter::default()); + let sec_header = PusTcSecondaryHeader::new_simple(8, 128); + let action_id = 5_u32; + let mut app_data: [u8; 16] = [0; 16]; + // Invalid ID, routing should fail. + app_data[0..4].copy_from_slice(&TEST_UNIQUE_ID_0.to_be_bytes()); + app_data[4..8].copy_from_slice(&action_id.to_be_bytes()); + for i in 0..8 { + app_data[i + 8] = i as u8; + } + let pus8_packet = PusTcCreator::new( + SpHeader::new_from_apid(TEST_APID), + sec_header, + &app_data, + true, + ); + let token = testbench.add_tc(&pus8_packet); + let result = testbench.convert(token, &[], TEST_APID, TEST_UNIQUE_ID_0); + assert!(result.is_ok()); + let (active_req, request) = result.unwrap(); + if let ActionRequestVariant::VecData(vec) = request.variant { + assert_eq!(request.action_id, action_id); + assert_eq!(active_req.action_id, action_id); + assert_eq!(vec, app_data[8..].to_vec()); + } else { + panic!("unexpected action request variant"); + } + } + + #[test] + fn reply_handling_completion_success() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let reply = ActionReplyPus::new(action_id, ActionReplyVariant::Completed); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.assert_full_completion_success( + TEST_COMPONENT_ID_0.id(), + req_id, + None, + ); + } + + #[test] + fn reply_handling_completion_failure() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let error_code = ResultU16::new(2, 3); + let reply = ActionReplyPus::new( + action_id, + ActionReplyVariant::CompletionFailed { + error_code, + params: None, + }, + ); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.into(), + req_id, + None, + error_code.raw() as u64, + ); + } + + #[test] + fn reply_handling_step_success() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let reply = ActionReplyPus::new(action_id, ActionReplyVariant::StepSuccess { step: 1 }); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + // Entry should not be removed, completion not done yet. + assert!(!result.unwrap()); + testbench.verif_reporter.check_next_was_added(req_id); + testbench + .verif_reporter + .check_next_is_acceptance_success(TEST_COMPONENT_ID_0.raw(), req_id); + testbench + .verif_reporter + .check_next_is_started_success(TEST_COMPONENT_ID_0.raw(), req_id); + testbench + .verif_reporter + .check_next_is_step_success(TEST_COMPONENT_ID_0.raw(), req_id, 1); + } + + #[test] + fn reply_handling_step_failure() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_req) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let active_action_req = + ActivePusActionRequestStd::new_from_common_req(action_id, active_req); + let error_code = ResultU16::new(2, 3); + let reply = ActionReplyPus::new( + action_id, + ActionReplyVariant::StepFailed { + error_code, + step: 1, + params: None, + }, + ); + let generic_reply = GenericMessage::new(MessageMetadata::new(req_id.into(), 0), reply); + let result = testbench.handle_reply(&generic_reply, &active_action_req, &[]); + assert!(result.is_ok()); + assert!(result.unwrap()); + testbench.verif_reporter.check_next_was_added(req_id); + testbench + .verif_reporter + .check_next_is_acceptance_success(TEST_COMPONENT_ID_0.id(), req_id); + testbench + .verif_reporter + .check_next_is_started_success(TEST_COMPONENT_ID_0.id(), req_id); + testbench.verif_reporter.check_next_is_step_failure( + TEST_COMPONENT_ID_0.id(), + req_id, + error_code.raw().into(), + ); + } + + #[test] + fn reply_handling_unrequested_reply() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_reply = ActionReplyPus::new(5_u32, ActionReplyVariant::Completed); + let unrequested_reply = + GenericMessage::new(MessageMetadata::new(10_u32, 15_u64), action_reply); + // Right now this function does not do a lot. We simply check that it does not panic or do + // weird stuff. + let result = testbench.handle_unrequested_reply(&unrequested_reply); + assert!(result.is_ok()); + } + + #[test] + fn reply_handling_reply_timeout() { + let mut testbench = + ReplyHandlerTestbench::new(TEST_COMPONENT_ID_0.id(), ActionReplyHandler::default()); + let action_id = 5_u32; + let (req_id, active_request) = testbench.add_tc(TEST_APID, TEST_UNIQUE_ID_0, &[]); + let result = testbench.handle_request_timeout( + &ActivePusActionRequestStd::new_from_common_req(action_id, active_request), + &[], + ); + assert!(result.is_ok()); + testbench.verif_reporter.assert_completion_failure( + TEST_COMPONENT_ID_0.raw(), + req_id, + None, + tmtc_err::REQUEST_TIMEOUT.raw() as u64, + ); + } +} diff --git a/src/pus/mod.rs b/src/pus/mod.rs index 62deb27..2f88a26 100644 --- a/src/pus/mod.rs +++ b/src/pus/mod.rs @@ -1,8 +1,8 @@ +pub mod action; pub mod stack; pub mod test; use crate::requests::GenericRequestRouter; -use crate::tmtc::MpscStoreAndSendError; use log::warn; use ops_sat_rs::config::components::PUS_ROUTING_SERVICE; use ops_sat_rs::config::{tmtc_err, CustomPusServiceId}; @@ -13,14 +13,15 @@ use satrs::pus::verification::{ }; use satrs::pus::{ ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter, - EcssTcReceiverCore, EcssTmSenderCore, EcssTmtcError, GenericConversionError, - GenericRoutingError, PusPacketHandlerResult, PusPacketHandlingError, PusReplyHandler, - PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, + EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError, + MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError, + PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory, }; -use satrs::queue::GenericReceiveError; +use satrs::queue::{GenericReceiveError, GenericSendError}; use satrs::request::{Apid, GenericMessage, MessageMetadata}; use satrs::spacepackets::ecss::tc::PusTcReader; -use satrs::spacepackets::ecss::PusServiceId; +use satrs::spacepackets::ecss::{PusPacket, PusServiceId}; +use satrs::tmtc::PacketAsVec; use satrs::ComponentId; use std::fmt::Debug; use std::sync::mpsc::{self, Sender}; @@ -53,11 +54,11 @@ pub struct PusTcMpscRouter { // pub event_tc_sender: Sender, // pub sched_tc_sender: Sender, // pub hk_tc_sender: Sender, - // pub action_tc_sender: Sender, + pub action_tc_sender: Sender, // pub mode_tc_sender: Sender, } -pub struct PusReceiver { +pub struct PusTcDistributor { pub id: ComponentId, pub tm_sender: TmSender, pub verif_reporter: VerificationReporter, @@ -65,7 +66,7 @@ pub struct PusReceiver { stamp_helper: TimeStampHelper, } -impl PusReceiver { +impl PusTcDistributor { pub fn new(tm_sender: TmSender, pus_router: PusTcMpscRouter) -> Self { Self { id: PUS_ROUTING_SERVICE.raw(), @@ -81,29 +82,38 @@ impl PusReceiver { pub fn handle_tc_packet( &mut self, - tc_in_memory: TcInMemory, - service: u8, - pus_tc: &PusTcReader, - ) -> Result { - let init_token = self.verif_reporter.add_tc(pus_tc); + sender_id: ComponentId, + tc: Vec, + ) -> Result { + let pus_tc_result = PusTcReader::new(&tc); + if pus_tc_result.is_err() { + log::warn!( + "error creating PUS TC received from {}: {}", + sender_id, + pus_tc_result.unwrap_err() + ); + log::warn!("raw data: {:x?}", tc); + return Ok(PusPacketHandlerResult::RequestHandled); + } + let pus_tc = pus_tc_result.unwrap().0; + let init_token = self.verif_reporter.add_tc(&pus_tc); self.stamp_helper.update_from_now(); let accepted_token = self .verif_reporter .acceptance_success(&self.tm_sender, init_token, self.stamp_helper.stamp()) .expect("Acceptance success failure"); - let service = PusServiceId::try_from(service); + let service = PusServiceId::try_from(pus_tc.service()); + let tc_in_memory = TcInMemory::Vec(PacketAsVec::new(sender_id, tc)); match service { Ok(standard_service) => match standard_service { PusServiceId::Test => self.pus_router.test_tc_sender.send(EcssTcAndToken { tc_in_memory, token: Some(accepted_token.into()), })?, - // PusServiceId::Housekeeping => { - // self.pus_router.hk_tc_sender.send(EcssTcAndToken { - // tc_in_memory, - // token: Some(accepted_token.into()), - // })? - // } + PusServiceId::Action => self.pus_router.action_tc_sender.send(EcssTcAndToken { + tc_in_memory, + token: Some(accepted_token.into()), + })?, // PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { // tc_in_memory, // token: Some(accepted_token.into()), @@ -161,7 +171,7 @@ impl PusReceiver { pub trait TargetedPusService { /// Returns [true] interface the packet handling is finished. - fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> bool; + fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus; fn check_for_request_timeouts(&mut self); } @@ -188,9 +198,6 @@ pub trait TargetedPusService { /// 3. [Self::check_for_request_timeouts] which checks for request timeouts, covering step 7. #[allow(dead_code)] pub struct PusTargetedRequestService< - TcReceiver: EcssTcReceiverCore, - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -199,8 +206,12 @@ pub struct PusTargetedRequestService< RequestType, ReplyType, > { - pub service_helper: - PusServiceHelper, + pub service_helper: PusServiceHelper< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + VerificationReporter, + >, pub request_router: GenericRequestRouter, pub request_converter: RequestConverter, pub active_request_map: ActiveRequestMap, @@ -209,11 +220,7 @@ pub struct PusTargetedRequestService< phantom: std::marker::PhantomData<(RequestType, ActiveRequestInfo, ReplyType)>, } -#[allow(dead_code)] impl< - TcReceiver: EcssTcReceiverCore, - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, VerificationReporter: VerificationReportingProvider, RequestConverter: PusTcToRequestConverter, ReplyHandler: PusReplyHandler, @@ -223,9 +230,6 @@ impl< ReplyType, > PusTargetedRequestService< - TcReceiver, - TmSender, - TcInMemConverter, VerificationReporter, RequestConverter, ReplyHandler, @@ -239,9 +243,9 @@ where { pub fn new( service_helper: PusServiceHelper< - TcReceiver, - TmSender, - TcInMemConverter, + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, VerificationReporter, >, request_converter: RequestConverter, @@ -442,7 +446,7 @@ where /// and also log the error. #[allow(dead_code)] pub fn generic_pus_request_timeout_handler( - sender: &(impl EcssTmSenderCore + ?Sized), + sender: &(impl EcssTmSender + ?Sized), active_request: &(impl ActiveRequestProvider + Debug), verification_handler: &impl VerificationReportingProvider, time_stamp: &[u8], @@ -466,12 +470,13 @@ pub(crate) mod tests { use std::time::Duration; use satrs::pus::test_util::TEST_COMPONENT_ID_0; - use satrs::pus::{MpscTmAsVecSender, PusTmAsVec, PusTmVariant}; + use satrs::pus::{MpscTmAsVecSender, PusTmVariant}; use satrs::request::RequestId; + use satrs::tmtc::PacketAsVec; use satrs::{ pus::{ verification::test_util::TestVerificationReporter, ActivePusRequestStd, - ActiveRequestMapProvider, EcssTcInVecConverter, MpscTcReceiver, + ActiveRequestMapProvider, }, request::UniqueApidTargetId, spacepackets::{ @@ -496,7 +501,7 @@ pub(crate) mod tests { pub id: ComponentId, pub verif_reporter: TestVerificationReporter, pub reply_handler: ReplyHandler, - pub tm_receiver: mpsc::Receiver, + pub tm_receiver: mpsc::Receiver, pub default_timeout: Duration, tm_sender: MpscTmAsVecSender, phantom: std::marker::PhantomData<(ActiveRequestInfo, Reply)>, @@ -596,7 +601,7 @@ pub(crate) mod tests { /// Dummy sender component which does nothing on the [Self::send_tm] call. /// /// Useful for unit tests. - impl EcssTmSenderCore for DummySender { + impl EcssTmSender for DummySender { fn send_tm(&self, _source_id: ComponentId, _tm: PusTmVariant) -> Result<(), EcssTmtcError> { Ok(()) } @@ -691,9 +696,6 @@ pub(crate) mod tests { ReplyType, > { pub service: PusTargetedRequestService< - MpscTcReceiver, - MpscTmAsVecSender, - EcssTcInVecConverter, TestVerificationReporter, RequestConverter, ReplyHandler, @@ -703,7 +705,7 @@ pub(crate) mod tests { ReplyType, >, pub request_id: Option, - pub tm_funnel_rx: mpsc::Receiver, + pub tm_funnel_rx: mpsc::Receiver, pub pus_packet_tx: mpsc::Sender, pub reply_tx: mpsc::Sender>, pub request_rx: mpsc::Receiver>, diff --git a/src/pus/stack.rs b/src/pus/stack.rs index f5aa1c1..0699046 100644 --- a/src/pus/stack.rs +++ b/src/pus/stack.rs @@ -1,11 +1,9 @@ -// use crate::pus::mode::ModeServiceWrapper; use crate::pus::test::TestCustomServiceWrapper; use crate::pus::HandlingStatus; use derive_new::new; -use satrs::{ - pus::{EcssTcInMemConverter, EcssTmSenderCore}, - spacepackets::time::{cds, TimeWriter}, -}; +use satrs::spacepackets::time::{cds, TimeWriter}; + +use super::{action::ActionServiceWrapper, TargetedPusService}; // use super::{ // action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper, @@ -14,18 +12,16 @@ use satrs::{ // }; #[derive(new)] -pub struct PusStack { - test_srv: TestCustomServiceWrapper, +pub struct PusStack { + test_srv: TestCustomServiceWrapper, // hk_srv_wrapper: HkServiceWrapper, // event_srv: EventServiceWrapper, - // action_srv_wrapper: ActionServiceWrapper, + action_srv_wrapper: ActionServiceWrapper, // schedule_srv: SchedulingServiceWrapper, // mode_srv: ModeServiceWrapper, } -impl - PusStack -{ +impl PusStack { pub fn periodic_operation(&mut self) { // Release all telecommands which reached their release time before calling the service // handlers. @@ -37,24 +33,31 @@ impl loop { let mut nothing_to_do = true; let mut is_srv_finished = - |tc_handling_done: bool, reply_handling_done: Option| { - if !tc_handling_done - || (reply_handling_done.is_some() - && reply_handling_done.unwrap() == HandlingStatus::Empty) + |_srv_id: u8, + tc_handling_status: HandlingStatus, + reply_handling_status: Option| { + if tc_handling_status == HandlingStatus::HandledOne + || (reply_handling_status.is_some() + && reply_handling_status.unwrap() == HandlingStatus::HandledOne) { nothing_to_do = false; } }; - is_srv_finished(self.test_srv.poll_and_handle_next_packet(&time_stamp), None); + is_srv_finished( + 17, + self.test_srv.poll_and_handle_next_packet(&time_stamp), + None, + ); // is_srv_finished(self.schedule_srv.poll_and_handle_next_tc(&time_stamp), None); // is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); - // is_srv_finished( - // self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), - // Some( - // self.action_srv_wrapper - // .poll_and_handle_next_reply(&time_stamp), - // ), - // ); + is_srv_finished( + 8, + self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), + Some( + self.action_srv_wrapper + .poll_and_handle_next_reply(&time_stamp), + ), + ); // is_srv_finished( // self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp), // Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)), @@ -65,7 +68,7 @@ impl // ); if nothing_to_do { // Timeout checking is only done once. - // self.action_srv_wrapper.check_for_request_timeouts(); + self.action_srv_wrapper.check_for_request_timeouts(); // self.hk_srv_wrapper.check_for_request_timeouts(); // self.mode_srv.check_for_request_timeouts(); break; diff --git a/src/pus/test.rs b/src/pus/test.rs index ec63005..8428be7 100644 --- a/src/pus/test.rs +++ b/src/pus/test.rs @@ -6,20 +6,22 @@ use ops_sat_rs::config::tmtc_err; use satrs::pus::test::PusService17TestHandler; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::{ - EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, EcssTmSenderCore, MpscTcReceiver, - MpscTmAsVecSender, PusPacketHandlerResult, PusServiceHelper, PusTmAsVec, + EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender, + PusPacketHandlerResult, PusServiceHelper, }; use satrs::spacepackets::ecss::tc::PusTcReader; use satrs::spacepackets::ecss::PusPacket; use satrs::spacepackets::time::cds::CdsTime; use satrs::spacepackets::time::TimeWriter; +use satrs::tmtc::PacketAsVec; use std::sync::mpsc; -pub fn create_test_service_dynamic( - tm_funnel_tx: mpsc::Sender, - // event_sender: mpsc::Sender, +use super::HandlingStatus; + +pub fn create_test_service( + tm_funnel_tx: mpsc::Sender, pus_test_rx: mpsc::Receiver, -) -> TestCustomServiceWrapper { +) -> TestCustomServiceWrapper { let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( PUS_TEST_SERVICE.id(), pus_test_rx, @@ -33,23 +35,22 @@ pub fn create_test_service_dynamic( } } -pub struct TestCustomServiceWrapper< - TmSender: EcssTmSenderCore, - TcInMemConverter: EcssTcInMemConverter, -> { - pub handler: - PusService17TestHandler, +pub struct TestCustomServiceWrapper { + pub handler: PusService17TestHandler< + MpscTcReceiver, + MpscTmAsVecSender, + EcssTcInVecConverter, + VerificationReporter, + >, // pub test_srv_event_sender: mpsc::Sender, } -impl - TestCustomServiceWrapper -{ - pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> bool { +impl TestCustomServiceWrapper { + pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus { let res = self.handler.poll_and_handle_next_tc(time_stamp); if res.is_err() { warn!("PUS17 handler failed with error {:?}", res.unwrap_err()); - return true; + return HandlingStatus::Empty; } match res.unwrap() { PusPacketHandlerResult::RequestHandled => { @@ -114,10 +115,8 @@ impl .expect("Sending start failure verification failed"); } } - PusPacketHandlerResult::Empty => { - return true; - } + PusPacketHandlerResult::Empty => return HandlingStatus::Empty, } - false + HandlingStatus::HandledOne } } diff --git a/src/requests.rs b/src/requests.rs index 90ed366..67d40fc 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -10,7 +10,7 @@ use satrs::mode::ModeRequest; use satrs::pus::verification::{ FailParams, TcStateAccepted, VerificationReportingProvider, VerificationToken, }; -use satrs::pus::{ActiveRequestProvider, EcssTmSenderCore, GenericRoutingError, PusRequestRouter}; +use satrs::pus::{ActiveRequestProvider, EcssTmSender, GenericRoutingError, PusRequestRouter}; use satrs::queue::GenericSendError; use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId}; use satrs::spacepackets::ecss::tc::PusTcReader; @@ -49,7 +49,7 @@ impl GenericRequestRouter { active_request: &impl ActiveRequestProvider, tc: &PusTcReader, error: GenericRoutingError, - tm_sender: &(impl EcssTmSenderCore + ?Sized), + tm_sender: &(impl EcssTmSender + ?Sized), verif_reporter: &impl VerificationReportingProvider, time_stamp: &[u8], ) { diff --git a/src/tm_funnel.rs b/src/tm_funnel.rs deleted file mode 100644 index 20c9d91..0000000 --- a/src/tm_funnel.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::{ - collections::HashMap, - sync::mpsc::{self}, -}; - -use log::info; -use satrs::pus::PusTmAsVec; -use satrs::{ - seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, - spacepackets::{ - ecss::{tm::PusTmZeroCopyWriter, PusPacket}, - time::cds::MIN_CDS_FIELD_LEN, - CcsdsPacket, - }, -}; - -use crate::interface::tcp::SyncTcpTmSource; - -#[derive(Default)] -pub struct CcsdsSeqCounterMap { - apid_seq_counter_map: HashMap, -} - -impl CcsdsSeqCounterMap { - pub fn get_and_increment(&mut self, apid: u16) -> u16 { - self.apid_seq_counter_map - .entry(apid) - .or_default() - .get_and_increment() - } -} - -pub struct TmFunnelCommon { - seq_counter_map: CcsdsSeqCounterMap, - msg_counter_map: HashMap, - sync_tm_tcp_source: SyncTcpTmSource, -} - -impl TmFunnelCommon { - pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self { - Self { - seq_counter_map: Default::default(), - msg_counter_map: Default::default(), - sync_tm_tcp_source, - } - } - - // Applies common packet processing operations for PUS TM packets. This includes setting - // a sequence counter - fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) { - // zero_copy_writer.set_apid(PUS_APID); - zero_copy_writer.set_seq_count( - self.seq_counter_map - .get_and_increment(zero_copy_writer.apid()), - ); - let entry = self - .msg_counter_map - .entry(zero_copy_writer.service()) - .or_insert(0); - zero_copy_writer.set_msg_count(*entry); - if *entry == u16::MAX { - *entry = 0; - } else { - *entry += 1; - } - - Self::packet_printout(&zero_copy_writer); - // This operation has to come last! - zero_copy_writer.finish(); - } - - fn packet_printout(tm: &PusTmZeroCopyWriter) { - info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); - } -} - -pub struct TmFunnelDynamic { - common: TmFunnelCommon, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, -} - -impl TmFunnelDynamic { - pub fn new( - sync_tm_tcp_source: SyncTcpTmSource, - tm_funnel_rx: mpsc::Receiver, - tm_server_tx: mpsc::Sender, - ) -> Self { - Self { - common: TmFunnelCommon::new(sync_tm_tcp_source), - tm_funnel_rx, - tm_server_tx, - } - } - - pub fn operation(&mut self) { - if let Ok(mut tm) = self.tm_funnel_rx.recv() { - // Read the TM, set sequence counter and message counter, and finally update - // the CRC. - let zero_copy_writer = PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) - .expect("Creating TM zero copy writer failed"); - self.common.apply_packet_processing(zero_copy_writer); - self.common.sync_tm_tcp_source.add_tm(&tm.packet); - self.tm_server_tx - .send(tm) - .expect("Sending TM to server failed"); - } - } -} diff --git a/src/tmtc.rs b/src/tmtc.rs deleted file mode 100644 index 85eb5e6..0000000 --- a/src/tmtc.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::pus::PusReceiver; -use satrs::pool::{StoreAddr, StoreError}; -use satrs::pus::{EcssTcAndToken, MpscTmAsVecSender}; -use satrs::spacepackets::ecss::PusPacket; -use satrs::{ - pus::ReceivesEcssPusTc, - spacepackets::{ecss::tc::PusTcReader, SpHeader}, - tmtc::ReceivesCcsdsTc, -}; -use std::sync::mpsc::{self, SendError, Sender, TryRecvError}; -use thiserror::Error; - -#[derive(Debug, Clone, PartialEq, Eq, Error)] -pub enum MpscStoreAndSendError { - #[error("Store error: {0}")] - Store(#[from] StoreError), - #[error("TC send error: {0}")] - TcSend(#[from] SendError), - #[error("TMTC send error: {0}")] - TmTcSend(#[from] SendError), -} - -// Newtype, can not implement necessary traits on MPSC sender directly because of orphan rules. -#[derive(Clone)] -pub struct PusTcSourceProviderDynamic(pub Sender>); - -impl ReceivesEcssPusTc for PusTcSourceProviderDynamic { - type Error = SendError>; - - fn pass_pus_tc(&mut self, _: &SpHeader, pus_tc: &PusTcReader) -> Result<(), Self::Error> { - self.0.send(pus_tc.raw_data().to_vec())?; - Ok(()) - } -} - -impl ReceivesCcsdsTc for PusTcSourceProviderDynamic { - type Error = mpsc::SendError>; - - fn pass_ccsds(&mut self, _: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { - self.0.send(tc_raw.to_vec())?; - Ok(()) - } -} - -// TC source components where the heap is the backing memory of the received telecommands. -pub struct TcSourceTaskDynamic { - pub tc_receiver: mpsc::Receiver>, - pus_receiver: PusReceiver, -} - -impl TcSourceTaskDynamic { - pub fn new( - tc_receiver: mpsc::Receiver>, - pus_receiver: PusReceiver, - ) -> Self { - Self { - tc_receiver, - pus_receiver, - } - } - - pub fn periodic_operation(&mut self) { - self.poll_tc(); - } - - pub fn poll_tc(&mut self) -> bool { - match self.tc_receiver.try_recv() { - Ok(tc) => match PusTcReader::new(&tc) { - Ok((pus_tc, _)) => { - self.pus_receiver - .handle_tc_packet( - satrs::pus::TcInMemory::Vec(tc.clone()), - pus_tc.service(), - &pus_tc, - ) - .ok(); - true - } - Err(e) => { - log::warn!("error creating PUS TC from raw data: {e}"); - log::warn!("raw data: {:x?}", tc); - true - } - }, - Err(e) => match e { - TryRecvError::Empty => false, - TryRecvError::Disconnected => { - log::warn!("tmtc thread: sender disconnected"); - false - } - }, - } - } -} diff --git a/src/tmtc/mod.rs b/src/tmtc/mod.rs new file mode 100644 index 0000000..bfd24c5 --- /dev/null +++ b/src/tmtc/mod.rs @@ -0,0 +1,2 @@ +pub mod tc_source; +pub mod tm_sink; diff --git a/src/tmtc/tc_source.rs b/src/tmtc/tc_source.rs new file mode 100644 index 0000000..02cf5d0 --- /dev/null +++ b/src/tmtc/tc_source.rs @@ -0,0 +1,47 @@ +use std::sync::mpsc::{self, TryRecvError}; + +use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec}; + +use crate::pus::{HandlingStatus, PusTcDistributor}; + +// TC source components where the heap is the backing memory of the received telecommands. +pub struct TcSourceTaskDynamic { + pub tc_receiver: mpsc::Receiver, + pus_distrib: PusTcDistributor, +} + +impl TcSourceTaskDynamic { + pub fn new( + tc_receiver: mpsc::Receiver, + pus_receiver: PusTcDistributor, + ) -> Self { + Self { + tc_receiver, + pus_distrib: pus_receiver, + } + } + + pub fn periodic_operation(&mut self) { + self.poll_tc(); + } + + pub fn poll_tc(&mut self) -> HandlingStatus { + // Right now, we only expect PUS packets. If any other protocols like CFDP are added at + // a later stage, we probably need to check for the APID before routing the packet. + match self.tc_receiver.try_recv() { + Ok(packet_with_sender) => { + self.pus_distrib + .handle_tc_packet(packet_with_sender.sender_id, packet_with_sender.packet) + .ok(); + HandlingStatus::HandledOne + } + Err(e) => match e { + TryRecvError::Empty => HandlingStatus::Empty, + TryRecvError::Disconnected => { + log::warn!("tmtc thread: sender disconnected"); + HandlingStatus::Empty + } + }, + } + } +} diff --git a/src/tmtc/tm_sink.rs b/src/tmtc/tm_sink.rs new file mode 100644 index 0000000..fdb8043 --- /dev/null +++ b/src/tmtc/tm_sink.rs @@ -0,0 +1,142 @@ +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::{collections::HashMap, sync::mpsc, time::Duration}; + +use log::info; +use ops_sat_rs::config::tasks::STOP_CHECK_FREQUENCY; +use satrs::tmtc::PacketAsVec; +use satrs::{ + seq_count::{CcsdsSimpleSeqCountProvider, SequenceCountProviderCore}, + spacepackets::{ + ecss::{tm::PusTmZeroCopyWriter, PusPacket}, + time::cds::MIN_CDS_FIELD_LEN, + CcsdsPacket, + }, +}; + +use crate::interface::tcp_server::SyncTcpTmSource; + +#[derive(Default)] +pub struct CcsdsSeqCounterMap { + apid_seq_counter_map: HashMap, +} + +impl CcsdsSeqCounterMap { + pub fn get_and_increment(&mut self, apid: u16) -> u16 { + self.apid_seq_counter_map + .entry(apid) + .or_default() + .get_and_increment() + } +} + +pub struct TmFunnelCommon { + seq_counter_map: CcsdsSeqCounterMap, + msg_counter_map: HashMap, + sync_tm_tcp_source: SyncTcpTmSource, +} + +impl TmFunnelCommon { + pub fn new(sync_tm_tcp_source: SyncTcpTmSource) -> Self { + Self { + seq_counter_map: Default::default(), + msg_counter_map: Default::default(), + sync_tm_tcp_source, + } + } + + // Applies common packet processing operations for PUS TM packets. This includes setting + // a sequence counter + fn apply_packet_processing(&mut self, mut zero_copy_writer: PusTmZeroCopyWriter) { + // zero_copy_writer.set_apid(PUS_APID); + zero_copy_writer.set_seq_count( + self.seq_counter_map + .get_and_increment(zero_copy_writer.apid()), + ); + let entry = self + .msg_counter_map + .entry(zero_copy_writer.service()) + .or_insert(0); + zero_copy_writer.set_msg_count(*entry); + if *entry == u16::MAX { + *entry = 0; + } else { + *entry += 1; + } + + Self::packet_printout(&zero_copy_writer); + // This operation has to come last! + zero_copy_writer.finish(); + } + + fn packet_printout(tm: &PusTmZeroCopyWriter) { + info!("Sending PUS TM[{},{}]", tm.service(), tm.subservice()); + } +} + +pub struct TmFunnelDynamic { + common: TmFunnelCommon, + tm_funnel_rx: mpsc::Receiver, + tm_udp_server_tx: mpsc::Sender, + tm_tcp_client_tx: mpsc::Sender, + stop_signal: Arc, +} + +impl TmFunnelDynamic { + pub fn new( + sync_tm_tcp_source: SyncTcpTmSource, + tm_funnel_rx: mpsc::Receiver, + tm_udp_server_tx: mpsc::Sender, + tm_tcp_client_tx: mpsc::Sender, + stop_signal: Arc, + ) -> Self { + Self { + common: TmFunnelCommon::new(sync_tm_tcp_source), + tm_funnel_rx, + tm_udp_server_tx, + tm_tcp_client_tx, + stop_signal, + } + } + + pub fn operation(&mut self) { + loop { + match self + .tm_funnel_rx + .recv_timeout(Duration::from_millis(STOP_CHECK_FREQUENCY)) + { + Ok(mut tm) => { + // Read the TM, set sequence counter and message counter, and finally update + // the CRC. + let zero_copy_writer = + PusTmZeroCopyWriter::new(&mut tm.packet, MIN_CDS_FIELD_LEN) + .expect("Creating TM zero copy writer failed"); + self.common.apply_packet_processing(zero_copy_writer); + self.common.sync_tm_tcp_source.add_tm(&tm.packet); + let result = self.tm_udp_server_tx.send(tm.clone()); + if result.is_err() { + log::error!("TM UDP server has disconnected"); + } + let result = self.tm_tcp_client_tx.send(tm); + if result.is_err() { + log::error!("TM TCP client has disconnected"); + } + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + Err(e) => match e { + mpsc::RecvTimeoutError::Timeout => { + if self.stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + } + mpsc::RecvTimeoutError::Disconnected => { + log::warn!("All TM funnel senders have disconnected"); + break; + } + }, + } + } + } +}