diff --git a/.idea/runConfigurations/Run_obsw_client_example.xml b/.idea/runConfigurations/Run_obsw_client_example.xml new file mode 100644 index 0000000..dc85ad6 --- /dev/null +++ b/.idea/runConfigurations/Run_obsw_client_example.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Run_obsw_example.xml b/.idea/runConfigurations/Run_obsw_example.xml new file mode 100644 index 0000000..ed323c9 --- /dev/null +++ b/.idea/runConfigurations/Run_obsw_example.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 0cefc8b..0e51f1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,24 +15,27 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.18" +version = "0.7.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e" dependencies = [ "memchr", ] [[package]] -name = "atomic-option" -version = "0.1.2" +name = "android_system_properties" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] [[package]] name = "atomic-polyfill" -version = "0.1.8" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14bf7b4f565e5e717d7a7a65b2a05c0b8c96e4db636d6f780f03b15108cdd1b" +checksum = "9c041a8d9751a520ee19656232a18971f18946a7900f1520ee4400002244dd89" dependencies = [ "critical-section", ] @@ -78,17 +81,16 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bumpalo" -version = "3.10.0" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" +checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" [[package]] name = "bus" -version = "2.2.3" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e66e1779f5b1440f1a58220ba3b3ded4427175f0a9fb8d7066521f8b4e8f2b" +checksum = "80cb4625f5b60155ff1018c9d4ce2e38bf5ae3e5780dfab9fa68bb44a6b751e2" dependencies = [ - "atomic-option", "crossbeam-channel", "num_cpus", "parking_lot_core", @@ -100,12 +102,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - [[package]] name = "cfg-if" version = "1.0.0" @@ -114,26 +110,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.20" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6127248204b9aba09a362f6c930ef6a78f2c1b2215f8a7b398c06e1083f17af0" +checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" dependencies = [ - "js-sys", + "iana-time-zone", "num-integer", "num-traits", - "wasm-bindgen", "winapi", ] -[[package]] -name = "cloudabi" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" -dependencies = [ - "bitflags", -] - [[package]] name = "cobs" version = "0.2.3" @@ -141,10 +127,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" [[package]] -name = "cortex-m" -version = "0.7.5" +name = "core-foundation-sys" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd20d4ac4aa86f4f75f239d59e542ef67de87cce2c282818dc6e84155d3ea126" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + +[[package]] +name = "cortex-m" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70858629a458fdfd39f9675c4dc309411f2a3f83bede76988d81bf1a0ecee9e0" dependencies = [ "bare-metal 0.2.5", "bitfield", @@ -174,30 +166,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95da181745b56d4bd339530ec393508910c909c784e8962d15d722bacf0bcbcd" dependencies = [ "bare-metal 1.0.0", - "cfg-if 1.0.0", + "cfg-if", "cortex-m", "riscv", ] [[package]] name = "crossbeam-channel" -version = "0.4.4" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ + "cfg-if", "crossbeam-utils", - "maybe-uninit", ] [[package]] name = "crossbeam-utils" -version = "0.7.2" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" dependencies = [ - "autocfg", - "cfg-if 0.1.10", - "lazy_static", + "cfg-if", + "once_cell", ] [[package]] @@ -211,6 +202,17 @@ dependencies = [ "syn", ] +[[package]] +name = "delegate" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "082a24a9967533dc5d743c602157637116fc1b52806d694a5a45e6f32567fcdd" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "downcast-rs" version = "1.2.0" @@ -232,7 +234,8 @@ name = "fsrc-core" version = "0.1.0" dependencies = [ "bus", - "delegate", + "crossbeam-channel", + "delegate 0.8.0", "downcast-rs", "hashbrown", "num-traits", @@ -240,7 +243,6 @@ dependencies = [ "postcard", "serde", "spacepackets", - "thiserror", "zerocopy", ] @@ -248,6 +250,9 @@ dependencies = [ name = "fsrc-example" version = "0.1.0" dependencies = [ + "crossbeam-channel", + "delegate 0.8.0", + "fsrc-core", "spacepackets", ] @@ -257,7 +262,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "wasi", ] @@ -282,9 +287,9 @@ dependencies = [ [[package]] name = "heapless" -version = "0.7.14" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065681e99f9ef7e0e813702a0326aedbcbbde7db5e55f097aedd1bf50b9dca43" +checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743" dependencies = [ "atomic-polyfill", "hash32", @@ -303,6 +308,20 @@ dependencies = [ "libc", ] +[[package]] +name = "iana-time-zone" +version = "0.1.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c495f162af0bf17656d0014a0eded5f3cd2f365fdd204548c2869db89359dc7" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "js-sys", + "once_cell", + "wasm-bindgen", + "winapi", +] + [[package]] name = "js-sys" version = "0.3.59" @@ -320,15 +339,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.126" +version = "0.2.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" [[package]] name = "lock_api" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390" dependencies = [ "autocfg", "scopeguard", @@ -340,15 +359,9 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] -[[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - [[package]] name = "memchr" version = "2.5.0" @@ -401,29 +414,28 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.13.1" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" +checksum = "2f7254b99e31cad77da24b08ebf628882739a608578bb1bcdfc1f9c21260d7c0" [[package]] name = "parking_lot_core" -version = "0.7.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ - "cfg-if 0.1.10", - "cloudabi", + "cfg-if", "libc", "redox_syscall", "smallvec", - "winapi", + "windows-sys", ] [[package]] name = "postcard" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41f5465c5e5a38e04552d8fb53ebcf4f58124ab3bbd0c02add043b33f82792e5" +checksum = "1c2b180dc0bade59f03fd005cb967d3f1e5f69b13922dad0cd6e047cb8af2363" dependencies = [ "cobs", "heapless", @@ -432,33 +444,36 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.39" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" +checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.18" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" +checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" dependencies = [ "proc-macro2", ] [[package]] name = "redox_syscall" -version = "0.1.57" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] [[package]] name = "regex" -version = "1.5.6" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" dependencies = [ "aho-corasick", "memchr", @@ -467,9 +482,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.26" +version = "0.6.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" +checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" [[package]] name = "riscv" @@ -507,7 +522,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.10", + "semver 1.0.13", ] [[package]] @@ -527,9 +542,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.10" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41d061efea015927ac527063765e73601444cdc344ba855bc7bd44578b25e1c" +checksum = "93f6841e709003d68bb2deee8c343572bf446003ec20a583e76f7b15cebf3711" [[package]] name = "semver-parser" @@ -539,18 +554,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.143" +version = "1.0.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e8e5d5b70924f74ff5c6d64d9a5acd91422117c60f48c4e07855238a254553" +checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.143" +version = "1.0.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3d8e8de557aee63c26b85b947f5e59b690d0454c753f3adeb5cd7835ab88391" +checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" dependencies = [ "proc-macro2", "quote", @@ -559,9 +574,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" [[package]] name = "spacepackets" @@ -569,7 +584,7 @@ version = "0.1.0" dependencies = [ "chrono", "crc", - "delegate", + "delegate 0.7.0", "num-traits", "postcard", "serde", @@ -578,9 +593,9 @@ dependencies = [ [[package]] name = "spin" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" dependencies = [ "lock_api", ] @@ -593,9 +608,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "syn" -version = "1.0.96" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0748dd251e24453cb8717f0354206b91557e4ec8703673a4b30208f2abaf1ebf" +checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" dependencies = [ "proc-macro2", "quote", @@ -614,31 +629,11 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "thiserror" -version = "1.0.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" -dependencies = [ - "thiserror-impl", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "unicode-ident" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" +checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" [[package]] name = "unicode-xid" @@ -685,7 +680,7 @@ version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "wasm-bindgen-macro", ] @@ -755,6 +750,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "zerocopy" version = "0.6.1" diff --git a/fsrc-core/Cargo.toml b/fsrc-core/Cargo.toml index 392e4f4..8188273 100644 --- a/fsrc-core/Cargo.toml +++ b/fsrc-core/Cargo.toml @@ -6,8 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -thiserror = "1.0" -delegate = "0.7.0" +delegate = "0.8.0" hashbrown = "0.12.3" [dependencies.num-traits] @@ -22,17 +21,23 @@ default-features = false version = "2.2.3" optional = true +[dependencies.crossbeam-channel] +version= "0.5" +default-features = false + [dependencies.spacepackets] path = "../spacepackets" [dev-dependencies] -postcard = { version = "1.0.1", features = ["use-std"] } serde = "1.0.143" zerocopy = "0.6.1" once_cell = "1.13.1" +[dev-dependencies.postcard] +version = "1.0.1" + [features] default = ["std"] -std = ["downcast-rs/std", "alloc", "bus"] +std = ["downcast-rs/std", "alloc", "bus", "postcard/use-std", "crossbeam-channel/std"] alloc = [] diff --git a/fsrc-core/src/hal/host/mod.rs b/fsrc-core/src/hal/host/mod.rs index d9e7799..8057db1 100644 --- a/fsrc-core/src/hal/host/mod.rs +++ b/fsrc-core/src/hal/host/mod.rs @@ -1 +1,2 @@ +//! Helper modules intended to be used on hosts with a full [std] runtime pub mod udp_server; diff --git a/fsrc-core/src/hal/host/udp_server.rs b/fsrc-core/src/hal/host/udp_server.rs index 53a67c5..2c7cd37 100644 --- a/fsrc-core/src/hal/host/udp_server.rs +++ b/fsrc-core/src/hal/host/udp_server.rs @@ -1,40 +1,200 @@ -use crate::hal::host::udp_server::ReceiveResult::{IoError, ReceiverError}; +//! UDP server helper components use crate::tmtc::ReceivesTc; +use std::boxed::Box; +use std::io::{Error, ErrorKind}; use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; +use std::vec; use std::vec::Vec; -pub struct UdpTmtcServer { - socket: UdpSocket, +/// This TC server helper can be used to receive raw PUS telecommands thorough a UDP interface. +/// +/// It caches all received telecomands into a vector. The maximum expected telecommand size should +/// be declared upfront. This avoids dynamic allocation during run-time. The user can specify a TC +/// receiver in form of a special trait object which implements [ReceivesTc]. Please note that the +/// receiver should copy out the received data if it the data is required past the +/// [ReceivesTc::pass_tc] call. +/// +/// # Examples +/// +/// ``` +/// use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +/// use fsrc_core::hal::host::udp_server::UdpTcServer; +/// use fsrc_core::tmtc::ReceivesTc; +/// use spacepackets::SpHeader; +/// use spacepackets::tc::PusTc; +/// +/// #[derive (Default)] +/// struct PingReceiver {} +/// impl ReceivesTc for PingReceiver { +/// type Error = (); +/// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { +/// assert_eq!(tc_raw.len(), 13); +/// Ok(()) +/// } +/// } +/// +/// let mut buf = [0; 32]; +/// let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); +/// let ping_receiver = PingReceiver::default(); +/// let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) +/// .expect("Creating UDP TMTC server failed"); +/// let mut sph = SpHeader::tc(0x02, 0, 0).unwrap(); +/// let pus_tc = PusTc::new_simple(&mut sph, 17, 1, None, true); +/// let len = pus_tc +/// .write_to(&mut buf) +/// .expect("Error writing PUS TC packet"); +/// assert_eq!(len, 13); +/// let client = UdpSocket::bind("127.0.0.1:7778").expect("Connecting to UDP server failed"); +/// client +/// .send_to(&buf[0..len], dest_addr) +/// .expect("Error sending PUS TC via UDP"); +/// ``` +/// +/// The [fsrc-example crate](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-example) +/// server code also includes +/// [example code](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-example/src/bin/obsw/tmtc.rs) +/// on how to use this TC server. It uses the server to receive PUS telecommands on a specific port +/// and then forwards them to a generic CCSDS packet receiver. +pub struct UdpTcServer { + pub socket: UdpSocket, recv_buf: Vec, + sender_addr: Option, tc_receiver: Box>, } +#[derive(Debug)] pub enum ReceiveResult { - IoError(std::io::Error), + NothingReceived, + IoError(Error), ReceiverError(E), } -impl UdpTmtcServer { +impl From for ReceiveResult { + fn from(e: Error) -> Self { + ReceiveResult::IoError(e) + } +} + +impl PartialEq for ReceiveResult { + fn eq(&self, other: &Self) -> bool { + use ReceiveResult::*; + match (self, other) { + (IoError(ref e), IoError(ref other_e)) => e.kind() == other_e.kind(), + (NothingReceived, NothingReceived) => true, + (ReceiverError(e), ReceiverError(other_e)) => e == other_e, + _ => false, + } + } +} + +impl Eq for ReceiveResult {} + +impl UdpTcServer { pub fn new( addr: A, max_recv_size: usize, tc_receiver: Box>, - ) -> Result { - Ok(Self { + ) -> Result { + let server = Self { socket: UdpSocket::bind(addr)?, - recv_buf: Vec::with_capacity(max_recv_size), + recv_buf: vec![0; max_recv_size], + sender_addr: None, tc_receiver, - }) + }; + server.socket.set_nonblocking(true)?; + Ok(server) } - pub fn recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult> { - let res = self - .socket - .recv_from(&mut self.recv_buf) - .map_err(|e| IoError(e))?; + pub fn try_recv_tc(&mut self) -> Result<(usize, SocketAddr), ReceiveResult> { + let res = match self.socket.recv_from(&mut self.recv_buf) { + Ok(res) => res, + Err(e) => { + return if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut { + Err(ReceiveResult::NothingReceived) + } else { + Err(e.into()) + } + } + }; + let (num_bytes, from) = res; + self.sender_addr = Some(from); self.tc_receiver - .pass_tc(&self.recv_buf[0..res.0]) - .map_err(|e| ReceiverError(e))?; + .pass_tc(&self.recv_buf[0..num_bytes]) + .map_err(|e| ReceiveResult::ReceiverError(e))?; Ok(res) } + + pub fn last_sender(&self) -> Option { + self.sender_addr + } +} + +#[cfg(test)] +mod tests { + use crate::hal::host::udp_server::{ReceiveResult, UdpTcServer}; + use crate::tmtc::ReceivesTc; + use spacepackets::tc::PusTc; + use spacepackets::SpHeader; + use std::boxed::Box; + use std::collections::VecDeque; + use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; + use std::vec::Vec; + + #[derive(Default)] + struct PingReceiver { + pub sent_cmds: VecDeque>, + } + + impl ReceivesTc for PingReceiver { + type Error = (); + + fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + let mut sent_data = Vec::new(); + sent_data.extend_from_slice(tc_raw); + self.sent_cmds.push_back(sent_data); + Ok(()) + } + } + + #[test] + fn basic_test() { + let mut buf = [0; 32]; + let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7777); + let ping_receiver = PingReceiver::default(); + let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) + .expect("Creating UDP TMTC server failed"); + let mut sph = SpHeader::tc(0x02, 0, 0).unwrap(); + let pus_tc = PusTc::new_simple(&mut sph, 17, 1, None, true); + let len = pus_tc + .write_to(&mut buf) + .expect("Error writing PUS TC packet"); + let client = UdpSocket::bind("127.0.0.1:7778").expect("Connecting to UDP server failed"); + client + .send_to(&buf[0..len], dest_addr) + .expect("Error sending PUS TC via UDP"); + let local_addr = client.local_addr().unwrap(); + udp_tc_server + .try_recv_tc() + .expect("Error receiving sent telecommand"); + assert_eq!( + udp_tc_server.last_sender().expect("No sender set"), + local_addr + ); + let ping_receiver: &mut PingReceiver = udp_tc_server.tc_receiver.downcast_mut().unwrap(); + assert_eq!(ping_receiver.sent_cmds.len(), 1); + let sent_cmd = ping_receiver.sent_cmds.pop_front().unwrap(); + assert_eq!(sent_cmd, buf[0..len]); + } + + #[test] + fn test_nothing_received() { + let dest_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7779); + let ping_receiver = PingReceiver::default(); + let mut udp_tc_server = UdpTcServer::new(dest_addr, 2048, Box::new(ping_receiver)) + .expect("Creating UDP TMTC server failed"); + let res = udp_tc_server.try_recv_tc(); + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err, ReceiveResult::NothingReceived); + } } diff --git a/fsrc-core/src/hal/mod.rs b/fsrc-core/src/hal/mod.rs index 43769ba..ddaf8f3 100644 --- a/fsrc-core/src/hal/mod.rs +++ b/fsrc-core/src/hal/mod.rs @@ -1,2 +1,3 @@ -#[cfg(feature = "use_std")] +//! # Hardware Abstraction Layer module +#[cfg(feature = "std")] pub mod host; diff --git a/fsrc-core/src/lib.rs b/fsrc-core/src/lib.rs index 70d4b93..02298a0 100644 --- a/fsrc-core/src/lib.rs +++ b/fsrc-core/src/lib.rs @@ -23,6 +23,7 @@ pub mod hal; pub mod objects; #[cfg(feature = "alloc")] pub mod pool; +pub mod pus; pub mod tmtc; extern crate downcast_rs; diff --git a/fsrc-core/src/pool.rs b/fsrc-core/src/pool.rs index af68d90..8411a35 100644 --- a/fsrc-core/src/pool.rs +++ b/fsrc-core/src/pool.rs @@ -6,14 +6,14 @@ //! embedded environments. The pool implementation will also track the size of the data stored //! inside it. //! -//! Transaction with the [pool][LocalPool] are done using a special [address][StoreAddr] type. +//! Transactions with the [pool][LocalPool] are done using a special [address][StoreAddr] type. //! Adding any data to the pool will yield a store address. Modification and read operations are //! done using a reference to a store address. Deletion will consume the store address. //! //! # Example //! //! ``` -//! use fsrc_core::pool::{LocalPool, PoolCfg}; +//! use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider}; //! //! // 4 buckets of 4 bytes, 2 of 8 bytes and 1 of 16 bytes //! let pool_cfg = PoolCfg::new(vec![(4, 4), (2, 8), (1, 16)]); @@ -78,15 +78,25 @@ use alloc::string::String; use alloc::vec; use alloc::vec::Vec; use delegate::delegate; +#[cfg(feature = "std")] +use std::boxed::Box; +#[cfg(feature = "std")] +use std::sync::{Arc, RwLock}; type NumBlocks = u16; +#[cfg(feature = "std")] +pub type ShareablePoolProvider = Box; +#[cfg(feature = "std")] +pub type SharedPool = Arc>; + /// Configuration structure of the [local pool][LocalPool] /// /// # Parameters /// /// * `cfg`: Vector of tuples which represent a subpool. The first entry in the tuple specifies the /// number of memory blocks in the subpool, the second entry the size of the blocks +#[derive(Clone)] pub struct PoolCfg { cfg: Vec<(NumBlocks, usize)>, } @@ -150,10 +160,52 @@ pub enum StoreError { InternalError(String), } +pub trait PoolProvider { + /// Add new data to the pool. The provider should attempt to reserve a memory block with the + /// appropriate size and then copy the given data to the block. Yields a [StoreAddr] which can + /// be used to access the data stored in the pool + fn add(&mut self, data: &[u8]) -> Result; + + /// The provider should attempt to reserve a free memory block with the appropriate size and + /// then return a mutable reference to it. Yields a [StoreAddr] which can be used to access + /// the data stored in the pool + fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError>; + + /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference + /// to it + fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError>; + + /// This function behaves like [Self::modify], but consumes the provided address and returns a + /// RAII conformant guard object. + /// + /// Unless the guard [PoolRwGuard::release] method is called, the data for the + /// given address will be deleted automatically when the guard is dropped. + /// This can prevent memory leaks. Users can read (and modify) the data and release the guard + /// if the data in the store is valid for further processing. If the data is faulty, no + /// manual deletion is necessary when returning from a processing function prematurely. + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard; + + /// Read data by yielding a read-only reference given a [StoreAddr] + fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError>; + + /// This function behaves like [Self::read], but consumes the provided address and returns a + /// RAII conformant guard object. + /// + /// Unless the guard [PoolRwGuard::release] method is called, the data for the + /// given address will be deleted automatically when the guard is dropped. + /// This can prevent memory leaks. Users can read the data and release the guard + /// if the data in the store is valid for further processing. If the data is faulty, no + /// manual deletion is necessary when returning from a processing function prematurely. + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard; + + /// Delete data inside the pool given a [StoreAddr] + fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError>; + fn has_element_at(&self, addr: &StoreAddr) -> Result; +} + impl LocalPool { const STORE_FREE: PoolSize = PoolSize::MAX; const MAX_SIZE: PoolSize = Self::STORE_FREE - 1; - /// Create a new local pool from the [given configuration][PoolCfg]. This function will sanitize /// the given configuration as well. pub fn new(mut cfg: PoolCfg) -> LocalPool { @@ -174,96 +226,6 @@ impl LocalPool { local_pool } - /// Add new data to the pool. It will attempt to reserve a memory block with the appropriate - /// size and then copy the given data to the block. Yields a [StoreAddr] which can be used - /// to access the data stored in the pool - pub fn add(&mut self, data: &[u8]) -> Result { - let data_len = data.len(); - if data_len > Self::MAX_SIZE { - return Err(StoreError::DataTooLarge(data_len)); - } - let addr = self.reserve(data_len)?; - self.write(&addr, data)?; - Ok(addr) - } - - /// Reserves a free memory block with the appropriate size and returns a mutable reference - /// to it. Yields a [StoreAddr] which can be used to access the data stored in the pool - pub fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { - if len > Self::MAX_SIZE { - return Err(StoreError::DataTooLarge(len)); - } - let addr = self.reserve(len)?; - let raw_pos = self.raw_pos(&addr).unwrap(); - let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..len]; - Ok((addr, block)) - } - - /// Modify data added previously using a given [StoreAddr] by yielding a mutable reference - /// to it - pub fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); - let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size]; - Ok(block) - } - - /// This function behaves like [Self::modify], but consumes the provided address and returns a - /// RAII conformant guard object. - /// - /// Unless the guard [PoolRwGuard::release] method is called, the data for the - /// given address will be deleted automatically when the guard is dropped. - /// This can prevent memory leaks. Users can read (and modify) the data and release the guard - /// if the data in the store is valid for further processing. If the data is faulty, no - /// manual deletion is necessary when returning from a processing function prematurely. - pub fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { - PoolRwGuard::new(self, addr) - } - - /// Read data by yielding a read-only reference given a [StoreAddr] - pub fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { - let curr_size = self.addr_check(addr)?; - let raw_pos = self.raw_pos(addr).unwrap(); - let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; - Ok(block) - } - - /// This function behaves like [Self::read], but consumes the provided address and returns a - /// RAII conformant guard object. - /// - /// Unless the guard [PoolRwGuard::release] method is called, the data for the - /// given address will be deleted automatically when the guard is dropped. - /// This can prevent memory leaks. Users can read the data and release the guard - /// if the data in the store is valid for further processing. If the data is faulty, no - /// manual deletion is necessary when returning from a processing function prematurely. - pub fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { - PoolGuard::new(self, addr) - } - - /// Delete data inside the pool given a [StoreAddr] - pub fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { - self.addr_check(&addr)?; - let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; - let raw_pos = self.raw_pos(&addr).unwrap(); - let block = - &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size]; - let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap(); - size_list[addr.packet_idx as usize] = Self::STORE_FREE; - block.fill(0); - Ok(()) - } - - pub fn has_element_at(&self, addr: &StoreAddr) -> Result { - self.validate_addr(addr)?; - let pool_idx = addr.pool_idx as usize; - let size_list = self.sizes_lists.get(pool_idx).unwrap(); - let curr_size = size_list[addr.packet_idx as usize]; - if curr_size == Self::STORE_FREE { - return Ok(false); - } - Ok(true) - } - fn addr_check(&self, addr: &StoreAddr) -> Result { self.validate_addr(addr)?; let pool_idx = addr.pool_idx as usize; @@ -354,6 +316,73 @@ impl LocalPool { } } +impl PoolProvider for LocalPool { + fn add(&mut self, data: &[u8]) -> Result { + let data_len = data.len(); + if data_len > Self::MAX_SIZE { + return Err(StoreError::DataTooLarge(data_len)); + } + let addr = self.reserve(data_len)?; + self.write(&addr, data)?; + Ok(addr) + } + + fn free_element(&mut self, len: usize) -> Result<(StoreAddr, &mut [u8]), StoreError> { + if len > Self::MAX_SIZE { + return Err(StoreError::DataTooLarge(len)); + } + let addr = self.reserve(len)?; + let raw_pos = self.raw_pos(&addr).unwrap(); + let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + len]; + Ok((addr, block)) + } + + fn modify(&mut self, addr: &StoreAddr) -> Result<&mut [u8], StoreError> { + let curr_size = self.addr_check(addr)?; + let raw_pos = self.raw_pos(addr).unwrap(); + let block = &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..curr_size]; + Ok(block) + } + + fn modify_with_guard(&mut self, addr: StoreAddr) -> PoolRwGuard { + PoolRwGuard::new(self, addr) + } + + fn read(&self, addr: &StoreAddr) -> Result<&[u8], StoreError> { + let curr_size = self.addr_check(addr)?; + let raw_pos = self.raw_pos(addr).unwrap(); + let block = &self.pool.get(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + curr_size]; + Ok(block) + } + + fn read_with_guard(&mut self, addr: StoreAddr) -> PoolGuard { + PoolGuard::new(self, addr) + } + + fn delete(&mut self, addr: StoreAddr) -> Result<(), StoreError> { + self.addr_check(&addr)?; + let block_size = self.pool_cfg.cfg.get(addr.pool_idx as usize).unwrap().1; + let raw_pos = self.raw_pos(&addr).unwrap(); + let block = + &mut self.pool.get_mut(addr.pool_idx as usize).unwrap()[raw_pos..raw_pos + block_size]; + let size_list = self.sizes_lists.get_mut(addr.pool_idx as usize).unwrap(); + size_list[addr.packet_idx as usize] = Self::STORE_FREE; + block.fill(0); + Ok(()) + } + + fn has_element_at(&self, addr: &StoreAddr) -> Result { + self.validate_addr(addr)?; + let pool_idx = addr.pool_idx as usize; + let size_list = self.sizes_lists.get(pool_idx).unwrap(); + let curr_size = size_list[addr.packet_idx as usize]; + if curr_size == Self::STORE_FREE { + return Ok(false); + } + Ok(true) + } +} + pub struct PoolGuard<'a> { pool: &'a mut LocalPool, pub addr: StoreAddr, @@ -417,7 +446,8 @@ impl<'a> PoolRwGuard<'a> { #[cfg(test)] mod tests { use crate::pool::{ - LocalPool, PoolCfg, PoolGuard, PoolRwGuard, StoreAddr, StoreError, StoreIdError, + LocalPool, PoolCfg, PoolGuard, PoolProvider, PoolRwGuard, StoreAddr, StoreError, + StoreIdError, }; use std::vec; diff --git a/fsrc-core/src/pus/mod.rs b/fsrc-core/src/pus/mod.rs new file mode 100644 index 0000000..fb3a317 --- /dev/null +++ b/fsrc-core/src/pus/mod.rs @@ -0,0 +1,7 @@ +//! All PUS support modules +//! +//! Currenty includes: +//! +//! 1. PUS Verification Service 1 module inside [verification]. Requires [alloc] support. +#[cfg(feature = "alloc")] +pub mod verification; diff --git a/fsrc-core/src/pus/verification.rs b/fsrc-core/src/pus/verification.rs new file mode 100644 index 0000000..4af6ad6 --- /dev/null +++ b/fsrc-core/src/pus/verification.rs @@ -0,0 +1,1631 @@ +//! # PUS Verification Service 1 Module +//! +//! This module allows packaging and sending PUS Service 1 packets. It is conforming to section +//! 8 of the PUS standard ECSS-E-ST-70-41C. +//! +//! The core object to report TC verification progress is the [VerificationReporter]. It exposes +//! an API which uses type-state programming to avoid calling the verification steps in +//! an invalid order. +//! +//! # Examples +//! +//! Basic single-threaded example where a full success sequence for a given ping telecommand is +//! executed. Note that the verification part could also be done in a separate thread. +//! +//! ``` +//! use std::sync::{Arc, RwLock}; +//! use std::time::Duration; +//! use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; +//! use fsrc_core::pus::verification::{CrossbeamVerifSender, VerificationReporterCfg, VerificationReporterWithSender}; +//! use spacepackets::ecss::PusPacket; +//! use spacepackets::SpHeader; +//! use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; +//! use spacepackets::tm::PusTm; +//! +//! const EMPTY_STAMP: [u8; 7] = [0; 7]; +//! const TEST_APID: u16 = 0x02; +//! +//! let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); +//! let shared_tm_pool: SharedPool = Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); +//! let (verif_tx, verif_rx) = crossbeam_channel::bounded(10); +//! let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), verif_tx); +//! let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); +//! let mut reporter = VerificationReporterWithSender::new(cfg , Box::new(sender)); +//! +//! let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap(); +//! let tc_header = PusTcSecondaryHeader::new_simple(17, 1); +//! let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true); +//! let init_token = reporter.add_tc(&pus_tc_0); +//! +//! // Complete success sequence for a telecommand +//! let accepted_token = reporter.acceptance_success(init_token, &EMPTY_STAMP).unwrap(); +//! let started_token = reporter.start_success(accepted_token, &EMPTY_STAMP).unwrap(); +//! reporter.completion_success(started_token, &EMPTY_STAMP).unwrap(); +//! +//! // Verify it arrives correctly on receiver end +//! let mut tm_buf: [u8; 1024] = [0; 1024]; +//! let mut packet_idx = 0; +//! while packet_idx < 3 { +//! let addr = verif_rx.recv_timeout(Duration::from_millis(10)).unwrap(); +//! let tm_len; +//! { +//! let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); +//! let store_guard = rg.read_with_guard(addr); +//! let slice = store_guard.read().expect("Error reading TM slice"); +//! tm_len = slice.len(); +//! tm_buf[0..tm_len].copy_from_slice(slice); +//! } +//! let (pus_tm, _) = PusTm::new_from_raw_slice(&tm_buf[0..tm_len], 7) +//! .expect("Error reading verification TM"); +//! if packet_idx == 0 { +//! assert_eq!(pus_tm.subservice(), 1); +//! } else if packet_idx == 1 { +//! assert_eq!(pus_tm.subservice(), 3); +//! } else if packet_idx == 2 { +//! assert_eq!(pus_tm.subservice(), 7); +//! } +//! packet_idx += 1; +//! } +//! ``` +//! +//! The [integration test](https://egit.irs.uni-stuttgart.de/rust/fsrc-launchpad/src/branch/main/fsrc-core/tests/verification_test.rs) +//! for the verification module contains examples how this module could be used in a more complex +//! context involving multiple threads +use alloc::boxed::Box; +use alloc::vec; +use alloc::vec::Vec; +use core::fmt::{Display, Formatter}; +use core::hash::{Hash, Hasher}; +use core::marker::PhantomData; +use core::mem::size_of; +use delegate::delegate; +use downcast_rs::{impl_downcast, Downcast}; +use spacepackets::ecss::{EcssEnumeration, PusError}; +use spacepackets::tc::PusTc; +use spacepackets::time::TimestampError; +use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; +use spacepackets::{ByteConversionError, SizeMissmatch, SpHeader}; +use spacepackets::{CcsdsPacket, PacketId, PacketSequenceCtrl}; + +#[cfg(feature = "std")] +pub use stdmod::{ + CrossbeamVerifSender, MpscVerifSender, SharedStdVerifReporterWithSender, + StdVerifReporterWithSender, StdVerifSenderError, +}; + +/// This is a request identifier as specified in 5.4.11.2 c. of the PUS standard +/// This field equivalent to the first two bytes of the CCSDS space packet header. +#[derive(Debug, Eq, Copy, Clone)] +pub struct RequestId { + version_number: u8, + packet_id: PacketId, + psc: PacketSequenceCtrl, +} + +impl Display for RequestId { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + write!(f, "{:#08x}", self.raw()) + } +} + +impl Hash for RequestId { + fn hash(&self, state: &mut H) { + self.raw().hash(state); + } +} + +// Implement manually to satisfy derive_hash_xor_eq lint +impl PartialEq for RequestId { + fn eq(&self, other: &Self) -> bool { + self.version_number == other.version_number + && self.packet_id == other.packet_id + && self.psc == other.psc + } +} + +impl RequestId { + pub const SIZE_AS_BYTES: usize = size_of::(); + + pub fn raw(&self) -> u32 { + ((self.version_number as u32) << 29) + | ((self.packet_id.raw() as u32) << 16) + | self.psc.raw() as u32 + } + + pub fn to_bytes(&self, buf: &mut [u8]) { + let raw = self.raw(); + buf.copy_from_slice(raw.to_be_bytes().as_slice()); + } + + pub fn from_bytes(buf: &[u8]) -> Option { + if buf.len() < 4 { + return None; + } + let raw = u32::from_be_bytes(buf[0..Self::SIZE_AS_BYTES].try_into().unwrap()); + Some(Self { + version_number: ((raw >> 29) & 0b111) as u8, + packet_id: PacketId::from(((raw >> 16) & 0xffff) as u16), + psc: PacketSequenceCtrl::from((raw & 0xffff) as u16), + }) + } +} +impl RequestId { + /// This allows extracting the request ID from a given PUS telecommand. + pub fn new(tc: &PusTc) -> Self { + RequestId { + version_number: tc.ccsds_version(), + packet_id: tc.packet_id(), + psc: tc.psc(), + } + } +} + +/// Generic error type which is also able to wrap a user send error with the user supplied type E. +#[derive(Debug, Clone)] +pub enum VerificationError { + /// Errors related to sending the verification telemetry to a TM recipient + SendError(E), + /// Errors related to the time stamp format of the telemetry + TimestampError(TimestampError), + /// Errors related to byte conversion, for example unsufficient buffer size for given data + ByteConversionError(ByteConversionError), + /// Errors related to PUS packet format + PusError(PusError), +} + +impl From for VerificationError { + fn from(e: ByteConversionError) -> Self { + VerificationError::ByteConversionError(e) + } +} + +/// If a verification operation fails, the passed token will be returned as well. This allows +/// re-trying the operation at a later point. +#[derive(Debug, Clone)] +pub struct VerificationErrorWithToken(VerificationError, VerificationToken); + +/// Generic trait for a user supplied sender object. This sender object is responsible for sending +/// PUS Service 1 Verification Telemetry to a verification TM recipient. The [Downcast] trait +/// is implemented to allow passing the sender as a boxed trait object and still retrieve the +/// concrete type at a later point. +pub trait VerificationSender: Downcast + Send { + fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; +} + +impl_downcast!(VerificationSender); + +/// Support token to allow type-state programming. This prevents calling the verification +/// steps in an invalid order. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct VerificationToken { + state: PhantomData, + req_id: RequestId, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct StateNone; +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct StateAccepted; +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct StateStarted; + +impl VerificationToken { + fn new(req_id: RequestId) -> VerificationToken { + VerificationToken { + state: PhantomData, + req_id, + } + } + + pub fn req_id(&self) -> RequestId { + self.req_id + } +} + +pub struct VerificationReporterCfg { + pub apid: u16, + pub dest_id: u16, + pub step_field_width: usize, + pub fail_code_field_width: usize, + pub max_fail_data_len: usize, +} + +impl VerificationReporterCfg { + /// Create a new configuration for the verification reporter. This includes following parameters: + /// + /// 1. Destination ID and APID, which could remain constant after construction. These parameters + /// can be tweaked in the reporter after construction. + /// 2. Maximum expected field sizes. The parameters of this configuration struct will be used + /// to determine required maximum buffer sizes and there will be no addition allocation or + /// configurable buffer parameters after [VerificationReporter] construction. + /// + /// This means the user has supply the maximum expected field sizes of verification messages + /// before constructing the reporter. + pub fn new( + apid: u16, + step_field_width: usize, + fail_code_field_width: usize, + max_fail_data_len: usize, + ) -> Self { + Self { + apid, + dest_id: 0, + step_field_width, + fail_code_field_width, + max_fail_data_len, + } + } +} + +/// Composite helper struct to pass failure parameters to the [VerificationReporter] +pub struct FailParams<'a> { + time_stamp: &'a [u8], + failure_code: &'a dyn EcssEnumeration, + failure_data: Option<&'a [u8]>, +} + +impl<'a> FailParams<'a> { + pub fn new( + time_stamp: &'a [u8], + failure_code: &'a impl EcssEnumeration, + failure_data: Option<&'a [u8]>, + ) -> Self { + Self { + time_stamp, + failure_code, + failure_data, + } + } +} + +/// Composite helper struct to pass step failure parameters to the [VerificationReporter] +pub struct FailParamsWithStep<'a> { + bp: FailParams<'a>, + step: &'a dyn EcssEnumeration, +} + +impl<'a> FailParamsWithStep<'a> { + pub fn new( + time_stamp: &'a [u8], + step: &'a impl EcssEnumeration, + failure_code: &'a impl EcssEnumeration, + failure_data: Option<&'a [u8]>, + ) -> Self { + Self { + bp: FailParams::new(time_stamp, failure_code, failure_data), + step, + } + } +} + +/// Primary verification handler. It provides an API to send PUS 1 verification telemetry packets +/// and verify the various steps of telecommand handling as specified in the PUS standard. +pub struct VerificationReporter { + pub apid: u16, + pub dest_id: u16, + msg_count: u16, + source_data_buf: Vec, +} + +impl VerificationReporter { + pub fn new(cfg: VerificationReporterCfg) -> Self { + Self { + apid: cfg.apid, + dest_id: cfg.dest_id, + msg_count: 0, + source_data_buf: vec![ + 0; + RequestId::SIZE_AS_BYTES + + cfg.step_field_width as usize + + cfg.fail_code_field_width as usize + + cfg.max_fail_data_len + ], + } + } + + pub fn allowed_source_data_len(&self) -> usize { + self.source_data_buf.capacity() + } + + /// Initialize verification handling by passing a TC reference. This returns a token required + /// to call the acceptance functions + pub fn add_tc(&mut self, pus_tc: &PusTc) -> VerificationToken { + self.add_tc_with_req_id(RequestId::new(pus_tc)) + } + + /// Same as [Self::add_tc] but pass a request ID instead of the direct telecommand. + /// This can be useful if the executing thread does not have full access to the telecommand. + pub fn add_tc_with_req_id(&mut self, req_id: RequestId) -> VerificationToken { + VerificationToken::::new(req_id) + } + + /// Package and send a PUS TM\[1, 1\] packet, see 8.1.2.1 of the PUS standard + pub fn acceptance_success( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + time_stamp: &[u8], + ) -> Result, VerificationErrorWithToken> { + let tm = self + .create_pus_verif_success_tm( + 1, + 1, + &token.req_id, + time_stamp, + None::<&dyn EcssEnumeration>, + ) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(VerificationToken { + state: PhantomData, + req_id: token.req_id, + }) + } + + /// Package and send a PUS TM\[1, 2\] packet, see 8.1.2.2 of the PUS standard + pub fn acceptance_failure( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + params: FailParams, + ) -> Result<(), VerificationErrorWithToken> { + let tm = self + .create_pus_verif_fail_tm(1, 2, &token.req_id, None::<&dyn EcssEnumeration>, ¶ms) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(()) + } + + /// Package and send a PUS TM\[1, 3\] packet, see 8.1.2.3 of the PUS standard. + /// + /// Requires a token previously acquired by calling [Self::acceptance_success]. + pub fn start_success( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + time_stamp: &[u8], + ) -> Result, VerificationErrorWithToken> { + let tm = self + .create_pus_verif_success_tm( + 1, + 3, + &token.req_id, + time_stamp, + None::<&dyn EcssEnumeration>, + ) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(VerificationToken { + state: PhantomData, + req_id: token.req_id, + }) + } + + /// Package and send a PUS TM\[1, 4\] packet, see 8.1.2.4 of the PUS standard. + /// + /// Requires a token previously acquired by calling [Self::acceptance_success]. It consumes + /// the token because verification handling is done. + pub fn start_failure( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + params: FailParams, + ) -> Result<(), VerificationErrorWithToken> { + let tm = self + .create_pus_verif_fail_tm(1, 4, &token.req_id, None::<&dyn EcssEnumeration>, ¶ms) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(()) + } + + /// Package and send a PUS TM\[1, 5\] packet, see 8.1.2.5 of the PUS standard. + /// + /// Requires a token previously acquired by calling [Self::start_success]. + pub fn step_success( + &mut self, + token: &VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + time_stamp: &[u8], + step: impl EcssEnumeration, + ) -> Result<(), VerificationError> { + let tm = self.create_pus_verif_success_tm(1, 5, &token.req_id, time_stamp, Some(&step))?; + sender.send_verification_tm(tm)?; + self.msg_count += 1; + Ok(()) + } + + /// Package and send a PUS TM\[1, 6\] packet, see 8.1.2.6 of the PUS standard. + /// + /// Requires a token previously acquired by calling [Self::start_success]. It consumes the + /// token because verification handling is done. + pub fn step_failure( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + params: FailParamsWithStep, + ) -> Result<(), VerificationErrorWithToken> { + let tm = self + .create_pus_verif_fail_tm(1, 6, &token.req_id, Some(params.step), ¶ms.bp) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(()) + } + + /// Package and send a PUS TM\[1, 7\] packet, see 8.1.2.7 of the PUS standard. + /// + /// Requires a token previously acquired by calling [Self::start_success]. It consumes the + /// token because verification handling is done. + pub fn completion_success( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + time_stamp: &[u8], + ) -> Result<(), VerificationErrorWithToken> { + let tm = self + .create_pus_verif_success_tm( + 1, + 7, + &token.req_id, + time_stamp, + None::<&dyn EcssEnumeration>, + ) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(()) + } + + /// Package and send a PUS TM\[1, 8\] packet, see 8.1.2.8 of the PUS standard. + /// + /// Requires a token previously acquired by calling [Self::start_success]. It consumes the + /// token because verification handling is done. + pub fn completion_failure( + &mut self, + token: VerificationToken, + sender: &mut (impl VerificationSender + ?Sized), + params: FailParams, + ) -> Result<(), VerificationErrorWithToken> { + let tm = self + .create_pus_verif_fail_tm(1, 8, &token.req_id, None::<&dyn EcssEnumeration>, ¶ms) + .map_err(|e| VerificationErrorWithToken(e, token))?; + sender + .send_verification_tm(tm) + .map_err(|e| VerificationErrorWithToken(e, token))?; + self.msg_count += 1; + Ok(()) + } + + fn create_pus_verif_success_tm<'a, E>( + &'a mut self, + service: u8, + subservice: u8, + req_id: &RequestId, + time_stamp: &'a [u8], + step: Option<&(impl EcssEnumeration + ?Sized)>, + ) -> Result> { + let mut source_data_len = size_of::(); + if let Some(step) = step { + source_data_len += step.byte_width() as usize; + } + self.source_buffer_large_enough(source_data_len)?; + let mut idx = 0; + req_id.to_bytes(&mut self.source_data_buf[0..RequestId::SIZE_AS_BYTES]); + idx += RequestId::SIZE_AS_BYTES; + if let Some(step) = step { + // Size check was done beforehand + step.to_bytes(&mut self.source_data_buf[idx..idx + step.byte_width() as usize]) + .unwrap(); + } + let mut sp_header = SpHeader::tm(self.apid, 0, 0).unwrap(); + Ok(self.create_pus_verif_tm_base( + service, + subservice, + &mut sp_header, + time_stamp, + source_data_len, + )) + } + + fn create_pus_verif_fail_tm<'a, E>( + &'a mut self, + service: u8, + subservice: u8, + req_id: &RequestId, + step: Option<&(impl EcssEnumeration + ?Sized)>, + params: &'a FailParams, + ) -> Result> { + let mut idx = 0; + let mut source_data_len = + RequestId::SIZE_AS_BYTES + params.failure_code.byte_width() as usize; + if let Some(step) = step { + source_data_len += step.byte_width() as usize; + } + if let Some(failure_data) = params.failure_data { + source_data_len += failure_data.len(); + } + self.source_buffer_large_enough(source_data_len)?; + req_id.to_bytes(&mut self.source_data_buf[0..RequestId::SIZE_AS_BYTES]); + idx += RequestId::SIZE_AS_BYTES; + if let Some(step) = step { + // Size check done beforehand + step.to_bytes(&mut self.source_data_buf[idx..idx + step.byte_width() as usize]) + .unwrap(); + idx += step.byte_width() as usize; + } + params.failure_code.to_bytes( + &mut self.source_data_buf[idx..idx + params.failure_code.byte_width() as usize], + )?; + idx += params.failure_code.byte_width() as usize; + if let Some(failure_data) = params.failure_data { + self.source_data_buf[idx..idx + failure_data.len()].copy_from_slice(failure_data); + } + let mut sp_header = SpHeader::tm(self.apid, 0, 0).unwrap(); + Ok(self.create_pus_verif_tm_base( + service, + subservice, + &mut sp_header, + params.time_stamp, + source_data_len, + )) + } + + fn source_buffer_large_enough(&self, len: usize) -> Result<(), VerificationError> { + if len > self.source_data_buf.capacity() { + return Err(VerificationError::ByteConversionError( + ByteConversionError::ToSliceTooSmall(SizeMissmatch { + found: self.source_data_buf.capacity(), + expected: len, + }), + )); + } + Ok(()) + } + + fn create_pus_verif_tm_base<'a>( + &'a mut self, + service: u8, + subservice: u8, + sp_header: &mut SpHeader, + time_stamp: &'a [u8], + source_data_len: usize, + ) -> PusTm { + let tm_sec_header = PusTmSecondaryHeader::new( + service, + subservice, + self.msg_count, + self.dest_id, + time_stamp, + ); + PusTm::new( + sp_header, + tm_sec_header, + Some(&self.source_data_buf[0..source_data_len]), + true, + ) + } +} + +/// Helper object which caches the sender passed as a trait object. Provides the same +/// API as [VerificationReporter] but without the explicit sender arguments. +pub struct VerificationReporterWithSender { + reporter: VerificationReporter, + pub sender: Box>, +} + +impl VerificationReporterWithSender { + pub fn new(cfg: VerificationReporterCfg, sender: Box>) -> Self { + Self::new_from_reporter(VerificationReporter::new(cfg), sender) + } + + pub fn new_from_reporter( + reporter: VerificationReporter, + sender: Box>, + ) -> Self { + Self { reporter, sender } + } + + delegate! { + to self.reporter { + pub fn add_tc(&mut self, pus_tc: &PusTc) -> VerificationToken; + pub fn add_tc_with_req_id(&mut self, req_id: RequestId) -> VerificationToken; + } + } + + pub fn acceptance_success( + &mut self, + token: VerificationToken, + time_stamp: &[u8], + ) -> Result, VerificationErrorWithToken> { + self.reporter + .acceptance_success(token, self.sender.as_mut(), time_stamp) + } + + pub fn acceptance_failure( + &mut self, + token: VerificationToken, + params: FailParams, + ) -> Result<(), VerificationErrorWithToken> { + self.reporter + .acceptance_failure(token, self.sender.as_mut(), params) + } + + pub fn start_success( + &mut self, + token: VerificationToken, + time_stamp: &[u8], + ) -> Result, VerificationErrorWithToken> { + self.reporter + .start_success(token, self.sender.as_mut(), time_stamp) + } + + pub fn start_failure( + &mut self, + token: VerificationToken, + params: FailParams, + ) -> Result<(), VerificationErrorWithToken> { + self.reporter + .start_failure(token, self.sender.as_mut(), params) + } + + pub fn step_success( + &mut self, + token: &VerificationToken, + time_stamp: &[u8], + step: impl EcssEnumeration, + ) -> Result<(), VerificationError> { + self.reporter + .step_success(token, self.sender.as_mut(), time_stamp, step) + } + + pub fn step_failure( + &mut self, + token: VerificationToken, + params: FailParamsWithStep, + ) -> Result<(), VerificationErrorWithToken> { + self.reporter + .step_failure(token, self.sender.as_mut(), params) + } + + pub fn completion_success( + &mut self, + token: VerificationToken, + time_stamp: &[u8], + ) -> Result<(), VerificationErrorWithToken> { + self.reporter + .completion_success(token, self.sender.as_mut(), time_stamp) + } + + pub fn completion_failure( + &mut self, + token: VerificationToken, + params: FailParams, + ) -> Result<(), VerificationErrorWithToken> { + self.reporter + .completion_failure(token, self.sender.as_mut(), params) + } +} + +#[cfg(feature = "std")] +mod stdmod { + use crate::pool::{ShareablePoolProvider, SharedPool, StoreAddr, StoreError}; + use crate::pus::verification::{ + VerificationError, VerificationReporterWithSender, VerificationSender, + }; + use delegate::delegate; + use spacepackets::tm::PusTm; + use std::sync::{mpsc, Arc, Mutex, RwLockWriteGuard}; + + pub type StdVerifReporterWithSender = VerificationReporterWithSender; + pub type SharedStdVerifReporterWithSender = Arc>; + + #[derive(Debug, Eq, PartialEq)] + pub enum StdVerifSenderError { + PoisonError, + StoreError(StoreError), + RxDisconnected(StoreAddr), + } + + impl From for StdVerifSenderError { + fn from(e: StoreError) -> Self { + StdVerifSenderError::StoreError(e) + } + } + + impl From for VerificationError { + fn from(e: StoreError) -> Self { + VerificationError::SendError(e.into()) + } + } + + trait SendBackend: Send { + fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr>; + } + + struct StdSenderBase { + pub ignore_poison_error: bool, + tm_store: SharedPool, + tx: S, + } + + impl StdSenderBase { + pub fn new(tm_store: SharedPool, tx: S) -> Self { + Self { + ignore_poison_error: false, + tm_store, + tx, + } + } + } + + unsafe impl Sync for StdSenderBase {} + unsafe impl Send for StdSenderBase {} + + impl SendBackend for mpsc::Sender { + fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { + self.send(addr).map_err(|_| addr) + } + } + + pub struct MpscVerifSender { + base: StdSenderBase>, + } + + /// Verification sender with a [mpsc::Sender] backend. + /// It implements the [VerificationSender] trait to be used as PUS Verification TM sender. + impl MpscVerifSender { + pub fn new(tm_store: SharedPool, tx: mpsc::Sender) -> Self { + Self { + base: StdSenderBase::new(tm_store, tx), + } + } + } + + //noinspection RsTraitImplementation + impl VerificationSender for MpscVerifSender { + delegate!( + to self.base { + fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; + } + ); + } + unsafe impl Sync for MpscVerifSender {} + unsafe impl Send for MpscVerifSender {} + + impl SendBackend for crossbeam_channel::Sender { + fn send(&self, addr: StoreAddr) -> Result<(), StoreAddr> { + self.send(addr).map_err(|_| addr) + } + } + + /// Verification sender with a [crossbeam_channel::Sender] backend. + /// It implements the [VerificationSender] trait to be used as PUS Verification TM sender + pub struct CrossbeamVerifSender { + base: StdSenderBase>, + } + + impl CrossbeamVerifSender { + pub fn new(tm_store: SharedPool, tx: crossbeam_channel::Sender) -> Self { + Self { + base: StdSenderBase::new(tm_store, tx), + } + } + } + + //noinspection RsTraitImplementation + impl VerificationSender for CrossbeamVerifSender { + delegate!( + to self.base { + fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError>; + } + ); + } + + unsafe impl Sync for CrossbeamVerifSender {} + unsafe impl Send for CrossbeamVerifSender {} + + impl VerificationSender for StdSenderBase { + fn send_verification_tm( + &mut self, + tm: PusTm, + ) -> Result<(), VerificationError> { + let operation = |mut mg: RwLockWriteGuard| { + let (addr, buf) = mg.free_element(tm.len_packed())?; + tm.write_to(buf).map_err(VerificationError::PusError)?; + drop(mg); + self.tx.send(addr).map_err(|_| { + VerificationError::SendError(StdVerifSenderError::RxDisconnected(addr)) + })?; + Ok(()) + }; + match self.tm_store.write() { + Ok(lock) => operation(lock), + Err(poison_error) => { + if self.ignore_poison_error { + operation(poison_error.into_inner()) + } else { + Err(VerificationError::SendError( + StdVerifSenderError::PoisonError, + )) + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::pus::verification::{ + FailParams, FailParamsWithStep, RequestId, StateNone, VerificationError, + VerificationReporter, VerificationReporterCfg, VerificationReporterWithSender, + VerificationSender, VerificationToken, + }; + use alloc::boxed::Box; + use alloc::format; + use alloc::vec::Vec; + use spacepackets::ecss::{EcssEnumU16, EcssEnumU32, EcssEnumU8, EcssEnumeration, PusPacket}; + use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; + use spacepackets::tm::{PusTm, PusTmSecondaryHeaderT}; + use spacepackets::{ByteConversionError, CcsdsPacket, SpHeader}; + use std::collections::VecDeque; + + const TEST_APID: u16 = 0x02; + const EMPTY_STAMP: [u8; 7] = [0; 7]; + + #[derive(Debug, Eq, PartialEq)] + struct TmInfo { + pub subservice: u8, + pub apid: u16, + pub msg_counter: u16, + pub dest_id: u16, + pub time_stamp: [u8; 7], + pub req_id: RequestId, + pub additional_data: Option>, + } + + #[derive(Default)] + struct TestSender { + pub service_queue: VecDeque, + } + + impl VerificationSender<()> for TestSender { + fn send_verification_tm(&mut self, tm: PusTm) -> Result<(), VerificationError<()>> { + assert_eq!(PusPacket::service(&tm), 1); + assert!(tm.source_data().is_some()); + let mut time_stamp = [0; 7]; + time_stamp.clone_from_slice(&tm.time_stamp()[0..7]); + let src_data = tm.source_data().unwrap(); + assert!(src_data.len() >= 4); + let req_id = RequestId::from_bytes(&src_data[0..RequestId::SIZE_AS_BYTES]).unwrap(); + let mut vec = None; + if src_data.len() > 4 { + let mut new_vec = Vec::new(); + new_vec.extend_from_slice(&src_data[RequestId::SIZE_AS_BYTES..]); + vec = Some(new_vec); + } + self.service_queue.push_back(TmInfo { + subservice: PusPacket::subservice(&tm), + apid: tm.apid(), + msg_counter: tm.msg_counter(), + dest_id: tm.dest_id(), + time_stamp, + req_id, + additional_data: vec, + }); + Ok(()) + } + } + + #[derive(Debug, Copy, Clone, Eq, PartialEq)] + struct DummyError {} + #[derive(Default)] + struct FallibleSender {} + + impl VerificationSender for FallibleSender { + fn send_verification_tm(&mut self, _: PusTm) -> Result<(), VerificationError> { + Err(VerificationError::SendError(DummyError {})) + } + } + + struct TestBase<'a> { + vr: VerificationReporter, + #[allow(dead_code)] + tc: PusTc<'a>, + } + + impl<'a> TestBase<'a> { + fn rep(&mut self) -> &mut VerificationReporter { + &mut self.vr + } + } + struct TestBaseWithHelper<'a, E> { + helper: VerificationReporterWithSender, + #[allow(dead_code)] + tc: PusTc<'a>, + } + + impl<'a, E> TestBaseWithHelper<'a, E> { + fn rep(&mut self) -> &mut VerificationReporter { + &mut self.helper.reporter + } + } + + fn base_reporter() -> VerificationReporter { + let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); + VerificationReporter::new(cfg) + } + + fn base_tc_init(app_data: Option<&[u8]>) -> (PusTc, RequestId) { + let mut sph = SpHeader::tc(TEST_APID, 0x34, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(17, 1); + let pus_tc = PusTc::new(&mut sph, tc_header, app_data, true); + let req_id = RequestId::new(&pus_tc); + (pus_tc, req_id) + } + + fn base_init(api_sel: bool) -> (TestBase<'static>, VerificationToken) { + let mut reporter = base_reporter(); + let (tc, req_id) = base_tc_init(None); + let init_tok; + if api_sel { + init_tok = reporter.add_tc_with_req_id(req_id); + } else { + init_tok = reporter.add_tc(&tc); + } + (TestBase { vr: reporter, tc }, init_tok) + } + + fn base_with_helper_init() -> ( + TestBaseWithHelper<'static, ()>, + VerificationToken, + ) { + let mut reporter = base_reporter(); + let (tc, _) = base_tc_init(None); + let init_tok = reporter.add_tc(&tc); + let sender = TestSender::default(); + let helper = VerificationReporterWithSender::new_from_reporter(reporter, Box::new(sender)); + (TestBaseWithHelper { helper, tc }, init_tok) + } + + fn acceptance_check(sender: &mut TestSender, req_id: &RequestId) { + let cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 1, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: None, + req_id: req_id.clone(), + }; + assert_eq!(sender.service_queue.len(), 1); + let info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_basic_acceptance_success() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + b.vr.acceptance_success(tok, &mut sender, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + acceptance_check(&mut sender, &tok.req_id); + } + + #[test] + fn test_basic_acceptance_success_with_helper() { + let (mut b, tok) = base_with_helper_init(); + b.helper + .acceptance_success(tok, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + acceptance_check(sender, &tok.req_id); + } + + #[test] + fn test_acceptance_send_fails() { + let (mut b, tok) = base_init(false); + let mut faulty_sender = FallibleSender::default(); + let res = + b.vr.acceptance_success(tok, &mut faulty_sender, &EMPTY_STAMP); + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.1, tok); + match err.0 { + VerificationError::SendError(e) => { + assert_eq!(e, DummyError {}) + } + _ => panic!("{}", format!("Unexpected error {:?}", err.0)), + } + } + + fn acceptance_fail_check(sender: &mut TestSender, req_id: RequestId, stamp_buf: [u8; 7]) { + let cmp_info = TmInfo { + time_stamp: stamp_buf, + subservice: 2, + dest_id: 5, + apid: TEST_APID, + msg_counter: 0, + additional_data: Some([0, 2].to_vec()), + req_id, + }; + assert_eq!(sender.service_queue.len(), 1); + let info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_basic_acceptance_failure() { + let (mut b, tok) = base_init(true); + b.rep().dest_id = 5; + let stamp_buf = [1, 2, 3, 4, 5, 6, 7]; + let mut sender = TestSender::default(); + let fail_code = EcssEnumU16::new(2); + let fail_params = FailParams::new(stamp_buf.as_slice(), &fail_code, None); + b.vr.acceptance_failure(tok, &mut sender, fail_params) + .expect("Sending acceptance success failed"); + acceptance_fail_check(&mut sender, tok.req_id, stamp_buf); + } + + #[test] + fn test_basic_acceptance_failure_with_helper() { + let (mut b, tok) = base_with_helper_init(); + b.rep().dest_id = 5; + let stamp_buf = [1, 2, 3, 4, 5, 6, 7]; + let fail_code = EcssEnumU16::new(2); + let fail_params = FailParams::new(stamp_buf.as_slice(), &fail_code, None); + b.helper + .acceptance_failure(tok, fail_params) + .expect("Sending acceptance success failed"); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + acceptance_fail_check(sender, tok.req_id, stamp_buf); + } + + #[test] + fn test_acceptance_fail_data_too_large() { + let (mut b, tok) = base_with_helper_init(); + b.rep().dest_id = 5; + let stamp_buf = [1, 2, 3, 4, 5, 6, 7]; + let fail_code = EcssEnumU16::new(2); + let fail_data: [u8; 16] = [0; 16]; + // 4 req ID + 1 byte step + 2 byte error code + 8 byte fail data + assert_eq!(b.rep().allowed_source_data_len(), 15); + let fail_params = + FailParams::new(stamp_buf.as_slice(), &fail_code, Some(fail_data.as_slice())); + let res = b.helper.acceptance_failure(tok, fail_params); + assert!(res.is_err()); + let err_with_token = res.unwrap_err(); + assert_eq!(err_with_token.1, tok); + match err_with_token.0 { + VerificationError::ByteConversionError(e) => match e { + ByteConversionError::ToSliceTooSmall(missmatch) => { + assert_eq!( + missmatch.expected, + fail_data.len() + RequestId::SIZE_AS_BYTES + fail_code.byte_width() + ); + assert_eq!(missmatch.found, b.rep().allowed_source_data_len()); + } + _ => { + panic!("{}", format!("Unexpected error {:?}", e)) + } + }, + _ => { + panic!("{}", format!("Unexpected error {:?}", err_with_token.0)) + } + } + } + + #[test] + fn test_basic_acceptance_failure_with_fail_data() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + let fail_code = EcssEnumU8::new(10); + let fail_data = EcssEnumU32::new(12); + let mut fail_data_raw = [0; 4]; + fail_data.to_bytes(&mut fail_data_raw).unwrap(); + let fail_params = FailParams::new(&EMPTY_STAMP, &fail_code, Some(fail_data_raw.as_slice())); + b.vr.acceptance_failure(tok, &mut sender, fail_params) + .expect("Sending acceptance success failed"); + let cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 2, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: Some([10, 0, 0, 0, 12].to_vec()), + req_id: tok.req_id, + }; + assert_eq!(sender.service_queue.len(), 1); + let info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + fn start_fail_check(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) { + assert_eq!(sender.service_queue.len(), 2); + let mut cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 1, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: None, + req_id, + }; + let mut info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 4, + dest_id: 0, + apid: TEST_APID, + msg_counter: 1, + additional_data: Some([&[22], fail_data_raw.as_slice()].concat().to_vec()), + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_start_failure() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + let fail_code = EcssEnumU8::new(22); + let fail_data: i32 = -12; + let mut fail_data_raw = [0; 4]; + fail_data_raw.copy_from_slice(fail_data.to_be_bytes().as_slice()); + let fail_params = FailParams::new(&EMPTY_STAMP, &fail_code, Some(fail_data_raw.as_slice())); + + let accepted_token = + b.vr.acceptance_success(tok, &mut sender, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let empty = + b.vr.start_failure(accepted_token, &mut sender, fail_params) + .expect("Start failure failure"); + assert_eq!(empty, ()); + start_fail_check(&mut sender, tok.req_id, fail_data_raw); + } + + #[test] + fn test_start_failure_with_helper() { + let (mut b, tok) = base_with_helper_init(); + let fail_code = EcssEnumU8::new(22); + let fail_data: i32 = -12; + let mut fail_data_raw = [0; 4]; + fail_data_raw.copy_from_slice(fail_data.to_be_bytes().as_slice()); + let fail_params = FailParams::new(&EMPTY_STAMP, &fail_code, Some(fail_data_raw.as_slice())); + + let accepted_token = b + .helper + .acceptance_success(tok, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let empty = b + .helper + .start_failure(accepted_token, fail_params) + .expect("Start failure failure"); + assert_eq!(empty, ()); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + start_fail_check(sender, tok.req_id, fail_data_raw); + } + + fn step_success_check(sender: &mut TestSender, req_id: RequestId) { + let mut cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 1, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: None, + req_id, + }; + let mut info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + cmp_info = TmInfo { + time_stamp: [0, 1, 0, 1, 0, 1, 0], + subservice: 3, + dest_id: 0, + apid: TEST_APID, + msg_counter: 1, + additional_data: None, + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 5, + dest_id: 0, + apid: TEST_APID, + msg_counter: 2, + additional_data: Some([0].to_vec()), + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 5, + dest_id: 0, + apid: TEST_APID, + msg_counter: 3, + additional_data: Some([1].to_vec()), + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_steps_success() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + let accepted_token = b + .rep() + .acceptance_success(tok, &mut sender, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = b + .rep() + .start_success(accepted_token, &mut sender, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let mut empty = b + .rep() + .step_success( + &started_token, + &mut sender, + &EMPTY_STAMP, + EcssEnumU8::new(0), + ) + .expect("Sending step 0 success failed"); + assert_eq!(empty, ()); + empty = + b.vr.step_success( + &started_token, + &mut sender, + &EMPTY_STAMP, + EcssEnumU8::new(1), + ) + .expect("Sending step 1 success failed"); + assert_eq!(empty, ()); + assert_eq!(sender.service_queue.len(), 4); + step_success_check(&mut sender, tok.req_id); + } + + #[test] + fn test_steps_success_with_helper() { + let (mut b, tok) = base_with_helper_init(); + let accepted_token = b + .helper + .acceptance_success(tok, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = b + .helper + .start_success(accepted_token, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let mut empty = b + .helper + .step_success(&started_token, &EMPTY_STAMP, EcssEnumU8::new(0)) + .expect("Sending step 0 success failed"); + assert_eq!(empty, ()); + empty = b + .helper + .step_success(&started_token, &EMPTY_STAMP, EcssEnumU8::new(1)) + .expect("Sending step 1 success failed"); + assert_eq!(empty, ()); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + assert_eq!(sender.service_queue.len(), 4); + step_success_check(sender, tok.req_id); + } + + fn check_step_failure(sender: &mut TestSender, req_id: RequestId, fail_data_raw: [u8; 4]) { + assert_eq!(sender.service_queue.len(), 4); + let mut cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 1, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: None, + req_id, + }; + let mut info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + cmp_info = TmInfo { + time_stamp: [0, 1, 0, 1, 0, 1, 0], + subservice: 3, + dest_id: 0, + apid: TEST_APID, + msg_counter: 1, + additional_data: None, + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 5, + dest_id: 0, + apid: TEST_APID, + msg_counter: 2, + additional_data: Some([0].to_vec()), + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 6, + dest_id: 0, + apid: TEST_APID, + msg_counter: 3, + additional_data: Some( + [ + [1].as_slice(), + &[0, 0, 0x10, 0x20], + fail_data_raw.as_slice(), + ] + .concat() + .to_vec(), + ), + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_step_failure() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + let req_id = tok.req_id; + let fail_code = EcssEnumU32::new(0x1020); + let fail_data: f32 = -22.3232; + let mut fail_data_raw = [0; 4]; + fail_data_raw.copy_from_slice(fail_data.to_be_bytes().as_slice()); + let fail_step = EcssEnumU8::new(1); + let fail_params = FailParamsWithStep::new( + &EMPTY_STAMP, + &fail_step, + &fail_code, + Some(fail_data_raw.as_slice()), + ); + + let accepted_token = + b.vr.acceptance_success(tok, &mut sender, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = + b.vr.start_success(accepted_token, &mut sender, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let mut empty = + b.vr.step_success( + &started_token, + &mut sender, + &EMPTY_STAMP, + EcssEnumU8::new(0), + ) + .expect("Sending completion success failed"); + assert_eq!(empty, ()); + empty = + b.vr.step_failure(started_token, &mut sender, fail_params) + .expect("Step failure failed"); + assert_eq!(empty, ()); + check_step_failure(&mut sender, req_id, fail_data_raw); + } + + #[test] + fn test_steps_failure_with_helper() { + let (mut b, tok) = base_with_helper_init(); + let req_id = tok.req_id; + let fail_code = EcssEnumU32::new(0x1020); + let fail_data: f32 = -22.3232; + let mut fail_data_raw = [0; 4]; + fail_data_raw.copy_from_slice(fail_data.to_be_bytes().as_slice()); + let fail_step = EcssEnumU8::new(1); + let fail_params = FailParamsWithStep::new( + &EMPTY_STAMP, + &fail_step, + &fail_code, + Some(fail_data_raw.as_slice()), + ); + + let accepted_token = b + .helper + .acceptance_success(tok, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = b + .helper + .start_success(accepted_token, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let mut empty = b + .helper + .step_success(&started_token, &EMPTY_STAMP, EcssEnumU8::new(0)) + .expect("Sending completion success failed"); + assert_eq!(empty, ()); + empty = b + .helper + .step_failure(started_token, fail_params) + .expect("Step failure failed"); + assert_eq!(empty, ()); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + check_step_failure(sender, req_id, fail_data_raw); + } + + fn completion_fail_check(sender: &mut TestSender, req_id: RequestId) { + assert_eq!(sender.service_queue.len(), 3); + + let mut cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 1, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: None, + req_id, + }; + let mut info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + cmp_info = TmInfo { + time_stamp: [0, 1, 0, 1, 0, 1, 0], + subservice: 3, + dest_id: 0, + apid: TEST_APID, + msg_counter: 1, + additional_data: None, + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 8, + dest_id: 0, + apid: TEST_APID, + msg_counter: 2, + additional_data: Some([0, 0, 0x10, 0x20].to_vec()), + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_completion_failure() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + let req_id = tok.req_id; + let fail_code = EcssEnumU32::new(0x1020); + let fail_params = FailParams::new(&EMPTY_STAMP, &fail_code, None); + + let accepted_token = + b.vr.acceptance_success(tok, &mut sender, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = + b.vr.start_success(accepted_token, &mut sender, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let empty = + b.vr.completion_failure(started_token, &mut sender, fail_params) + .expect("Completion failure"); + assert_eq!(empty, ()); + completion_fail_check(&mut sender, req_id); + } + + #[test] + fn test_completion_failure_with_helper() { + let (mut b, tok) = base_with_helper_init(); + let req_id = tok.req_id; + let fail_code = EcssEnumU32::new(0x1020); + let fail_params = FailParams::new(&EMPTY_STAMP, &fail_code, None); + + let accepted_token = b + .helper + .acceptance_success(tok, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = b + .helper + .start_success(accepted_token, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let empty = b + .helper + .completion_failure(started_token, fail_params) + .expect("Completion failure"); + assert_eq!(empty, ()); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + completion_fail_check(sender, req_id); + } + + fn completion_success_check(sender: &mut TestSender, req_id: RequestId) { + assert_eq!(sender.service_queue.len(), 3); + let cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 1, + dest_id: 0, + apid: TEST_APID, + msg_counter: 0, + additional_data: None, + req_id, + }; + let mut info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + + let cmp_info = TmInfo { + time_stamp: [0, 1, 0, 1, 0, 1, 0], + subservice: 3, + dest_id: 0, + apid: TEST_APID, + msg_counter: 1, + additional_data: None, + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + let cmp_info = TmInfo { + time_stamp: EMPTY_STAMP, + subservice: 7, + dest_id: 0, + apid: TEST_APID, + msg_counter: 2, + additional_data: None, + req_id, + }; + info = sender.service_queue.pop_front().unwrap(); + assert_eq!(info, cmp_info); + } + + #[test] + fn test_complete_success_sequence() { + let (mut b, tok) = base_init(false); + let mut sender = TestSender::default(); + let accepted_token = + b.vr.acceptance_success(tok, &mut sender, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = + b.vr.start_success(accepted_token, &mut sender, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let empty = + b.vr.completion_success(started_token, &mut sender, &EMPTY_STAMP) + .expect("Sending completion success failed"); + assert_eq!(empty, ()); + completion_success_check(&mut sender, tok.req_id); + } + + #[test] + fn test_complete_success_sequence_with_helper() { + let (mut b, tok) = base_with_helper_init(); + let accepted_token = b + .helper + .acceptance_success(tok, &EMPTY_STAMP) + .expect("Sending acceptance success failed"); + let started_token = b + .helper + .start_success(accepted_token, &[0, 1, 0, 1, 0, 1, 0]) + .expect("Sending start success failed"); + let empty = b + .helper + .completion_success(started_token, &EMPTY_STAMP) + .expect("Sending completion success failed"); + assert_eq!(empty, ()); + let sender: &mut TestSender = b.helper.sender.downcast_mut().unwrap(); + completion_success_check(sender, tok.req_id); + } +} diff --git a/fsrc-core/src/tmtc/ccsds_distrib.rs b/fsrc-core/src/tmtc/ccsds_distrib.rs index 30feb3d..92d6c39 100644 --- a/fsrc-core/src/tmtc/ccsds_distrib.rs +++ b/fsrc-core/src/tmtc/ccsds_distrib.rs @@ -2,23 +2,23 @@ //! //! The routing components consist of two core components: //! 1. [CcsdsDistributor] component which dispatches received packets to a user-provided handler -//! 2. [ApidPacketHandler] trait which should be implemented by the user-provided packet handler. +//! 2. [CcsdsPacketHandler] trait which should be implemented by the user-provided packet handler. //! //! The [CcsdsDistributor] implements the [ReceivesCcsdsTc] and [ReceivesTc] trait which allows to //! pass raw or CCSDS packets to it. Upon receiving a packet, it performs the following steps: //! //! 1. It tries to identify the target Application Process Identifier (APID) based on the -//! respective CCSDS space packet header field. If that process fails, a [PacketError] is +//! respective CCSDS space packet header field. If that process fails, a [ByteConversionError] is //! returned to the user //! 2. If a valid APID is found and matches one of the APIDs provided by -//! [ApidPacketHandler::valid_apids], it will pass the packet to the user provided -//! [ApidPacketHandler::handle_known_apid] function. If no valid APID is found, the packet -//! will be passed to the [ApidPacketHandler::handle_unknown_apid] function. +//! [CcsdsPacketHandler::valid_apids], it will pass the packet to the user provided +//! [CcsdsPacketHandler::handle_known_apid] function. If no valid APID is found, the packet +//! will be passed to the [CcsdsPacketHandler::handle_unknown_apid] function. //! //! # Example //! //! ```rust -//! use fsrc_core::tmtc::ccsds_distrib::{ApidPacketHandler, CcsdsDistributor}; +//! use fsrc_core::tmtc::ccsds_distrib::{CcsdsPacketHandler, CcsdsDistributor}; //! use fsrc_core::tmtc::ReceivesTc; //! use spacepackets::{CcsdsPacket, SpHeader}; //! use spacepackets::tc::PusTc; @@ -33,7 +33,7 @@ //! fn mutable_foo(&mut self) {} //! } //! -//! impl ApidPacketHandler for ConcreteApidHandler { +//! impl CcsdsPacketHandler for ConcreteApidHandler { //! type Error = (); //! fn valid_apids(&self) -> &'static [u16] { &[0x002] } //! fn handle_known_apid(&mut self, sp_header: &SpHeader, tc_raw: &[u8]) -> Result<(), Self::Error> { @@ -87,7 +87,7 @@ use crate::tmtc::{ReceivesCcsdsTc, ReceivesTc}; use alloc::boxed::Box; use downcast_rs::Downcast; -use spacepackets::{CcsdsPacket, PacketError, SpHeader}; +use spacepackets::{ByteConversionError, CcsdsPacket, SizeMissmatch, SpHeader}; /// Generic trait for a handler or dispatcher object handling CCSDS packets. /// @@ -99,7 +99,7 @@ use spacepackets::{CcsdsPacket, PacketError, SpHeader}; /// This trait automatically implements the [downcast_rs::Downcast] to allow a more convenient API /// to cast trait objects back to their concrete type after the handler was passed to the /// distributor. -pub trait ApidPacketHandler: Downcast { +pub trait CcsdsPacketHandler: Downcast { type Error; fn valid_apids(&self) -> &'static [u16]; @@ -112,20 +112,20 @@ pub trait ApidPacketHandler: Downcast { ) -> Result<(), Self::Error>; } -downcast_rs::impl_downcast!(ApidPacketHandler assoc Error); +downcast_rs::impl_downcast!(CcsdsPacketHandler assoc Error); /// The CCSDS distributor dispatches received CCSDS packets to a user provided packet handler. pub struct CcsdsDistributor { /// User provided APID handler stored as a generic trait object. /// It can be cast back to the original concrete type using the [Self::apid_handler_ref] or /// the [Self::apid_handler_mut] method. - pub apid_handler: Box>, + pub apid_handler: Box>, } #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum CcsdsError { CustomError(E), - PacketError(PacketError), + PacketError(ByteConversionError), } impl ReceivesCcsdsTc for CcsdsDistributor { @@ -140,26 +140,34 @@ impl ReceivesTc for CcsdsDistributor { type Error = CcsdsError; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> { + if tc_raw.len() < 7 { + return Err(CcsdsError::PacketError( + ByteConversionError::FromSliceTooSmall(SizeMissmatch { + found: tc_raw.len(), + expected: 7, + }), + )); + } let sp_header = SpHeader::from_raw_slice(tc_raw).map_err(|e| CcsdsError::PacketError(e))?; self.dispatch_ccsds(&sp_header, tc_raw) } } impl CcsdsDistributor { - pub fn new(apid_handler: Box>) -> Self { + pub fn new(apid_handler: Box>) -> Self { CcsdsDistributor { apid_handler } } /// This function can be used to retrieve a reference to the concrete instance of the APID /// handler after it was passed to the distributor. See the /// [module documentation][crate::tmtc::ccsds_distrib] for an fsrc-example. - pub fn apid_handler_ref>(&self) -> Option<&T> { + pub fn apid_handler_ref>(&self) -> Option<&T> { self.apid_handler.downcast_ref::() } /// This function can be used to retrieve a mutable reference to the concrete instance of the /// APID handler after it was passed to the distributor. - pub fn apid_handler_mut>(&mut self) -> Option<&mut T> { + pub fn apid_handler_mut>(&mut self) -> Option<&mut T> { self.apid_handler.downcast_mut::() } @@ -183,7 +191,7 @@ impl CcsdsDistributor { #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::tmtc::ccsds_distrib::{ApidPacketHandler, CcsdsDistributor}; + use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; use spacepackets::tc::PusTc; use spacepackets::CcsdsPacket; use std::collections::VecDeque; @@ -209,7 +217,7 @@ pub(crate) mod tests { pub unknown_packet_queue: VecDeque<(u16, Vec)>, } - impl ApidPacketHandler for BasicApidHandlerSharedQueue { + impl CcsdsPacketHandler for BasicApidHandlerSharedQueue { type Error = (); fn valid_apids(&self) -> &'static [u16] { &[0x000, 0x002] @@ -244,7 +252,7 @@ pub(crate) mod tests { } } - impl ApidPacketHandler for BasicApidHandlerOwnedQueue { + impl CcsdsPacketHandler for BasicApidHandlerOwnedQueue { type Error = (); fn valid_apids(&self) -> &'static [u16] { diff --git a/fsrc-core/src/tmtc/mod.rs b/fsrc-core/src/tmtc/mod.rs index 9436047..b1d0b3d 100644 --- a/fsrc-core/src/tmtc/mod.rs +++ b/fsrc-core/src/tmtc/mod.rs @@ -6,6 +6,7 @@ //! Application Process ID (APID) or the ECSS PUS service type. This allows for fast packet //! routing without the overhead and complication of using message queues. However, it also requires use crate::error::{FsrcErrorRaw, FsrcGroupIds}; +use downcast_rs::{impl_downcast, Downcast}; use spacepackets::tc::PusTc; use spacepackets::SpHeader; @@ -13,6 +14,10 @@ use spacepackets::SpHeader; pub mod ccsds_distrib; #[cfg(feature = "alloc")] pub mod pus_distrib; +pub mod tm_helper; + +pub use ccsds_distrib::{CcsdsDistributor, CcsdsError, CcsdsPacketHandler}; +pub use pus_distrib::{PusDistributor, PusServiceProvider}; const _RAW_PACKET_ERROR: &str = "raw-tmtc"; const _CCSDS_ERROR: &str = "ccsds-tmtc"; @@ -39,11 +44,13 @@ const _FROM_BYTES_ZEROCOPY_ERROR: FsrcErrorRaw = FsrcErrorRaw::new( /// This trait is implemented by both the [crate::tmtc::pus_distrib::PusDistributor] and the /// [crate::tmtc::ccsds_distrib::CcsdsDistributor] which allows to pass the respective packets in /// raw byte format into them. -pub trait ReceivesTc { +pub trait ReceivesTc: Downcast { type Error; fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error>; } +impl_downcast!(ReceivesTc assoc Error); + /// Generic trait for object which can receive CCSDS space packets, for fsrc-example ECSS PUS packets /// for CCSDS File Delivery Protocol (CFDP) packets. /// diff --git a/fsrc-core/src/tmtc/pus_distrib.rs b/fsrc-core/src/tmtc/pus_distrib.rs index 906ad20..9ee8bc6 100644 --- a/fsrc-core/src/tmtc/pus_distrib.rs +++ b/fsrc-core/src/tmtc/pus_distrib.rs @@ -138,7 +138,7 @@ mod tests { use crate::tmtc::ccsds_distrib::tests::{ generate_ping_tc, BasicApidHandlerOwnedQueue, BasicApidHandlerSharedQueue, }; - use crate::tmtc::ccsds_distrib::{ApidPacketHandler, CcsdsDistributor}; + use crate::tmtc::ccsds_distrib::{CcsdsDistributor, CcsdsPacketHandler}; use alloc::vec::Vec; use spacepackets::ecss::PusError; use spacepackets::tc::PusTc; @@ -239,11 +239,11 @@ mod tests { }; } - impl ApidPacketHandler for ApidHandlerOwned { + impl CcsdsPacketHandler for ApidHandlerOwned { apid_handler_impl!(); } - impl ApidPacketHandler for ApidHandlerShared { + impl CcsdsPacketHandler for ApidHandlerShared { apid_handler_impl!(); } diff --git a/fsrc-core/src/tmtc/tm_helper.rs b/fsrc-core/src/tmtc/tm_helper.rs new file mode 100644 index 0000000..c8581bf --- /dev/null +++ b/fsrc-core/src/tmtc/tm_helper.rs @@ -0,0 +1,51 @@ +use spacepackets::time::{CdsShortTimeProvider, TimeWriter}; +use spacepackets::tm::{PusTm, PusTmSecondaryHeader}; +use spacepackets::SpHeader; + +pub struct PusTmWithCdsShortHelper { + apid: u16, + cds_short_buf: [u8; 7], +} + +impl PusTmWithCdsShortHelper { + pub fn new(apid: u16) -> Self { + Self { + apid, + cds_short_buf: [0; 7], + } + } + + #[cfg(feature = "std")] + pub fn create_pus_tm_timestamp_now<'a>( + &'a mut self, + service: u8, + subservice: u8, + source_data: Option<&'a [u8]>, + ) -> PusTm { + let time_stamp = CdsShortTimeProvider::from_now().unwrap(); + time_stamp.write_to_bytes(&mut self.cds_short_buf).unwrap(); + self.create_pus_tm_common(service, subservice, source_data) + } + + pub fn create_pus_tm_with_stamp<'a>( + &'a mut self, + service: u8, + subservice: u8, + source_data: Option<&'a [u8]>, + stamper: &CdsShortTimeProvider, + ) -> PusTm { + stamper.write_to_bytes(&mut self.cds_short_buf).unwrap(); + self.create_pus_tm_common(service, subservice, source_data) + } + + fn create_pus_tm_common<'a>( + &'a self, + service: u8, + subservice: u8, + source_data: Option<&'a [u8]>, + ) -> PusTm { + let mut reply_header = SpHeader::tm(self.apid, 0, 0).unwrap(); + let tc_header = PusTmSecondaryHeader::new_simple(service, subservice, &self.cds_short_buf); + PusTm::new(&mut reply_header, tc_header, source_data, true) + } +} diff --git a/fsrc-core/tests/pool_test.rs b/fsrc-core/tests/pool_test.rs index f06f302..3187fd9 100644 --- a/fsrc-core/tests/pool_test.rs +++ b/fsrc-core/tests/pool_test.rs @@ -1,4 +1,4 @@ -use fsrc_core::pool::{LocalPool, PoolCfg, PoolGuard, StoreAddr}; +use fsrc_core::pool::{LocalPool, PoolCfg, PoolGuard, PoolProvider, StoreAddr}; use std::ops::DerefMut; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; @@ -10,11 +10,11 @@ const DUMMY_DATA: [u8; 4] = [0, 1, 2, 3]; #[test] fn threaded_usage() { let pool_cfg = PoolCfg::new(vec![(16, 6), (32, 3), (8, 12)]); - let shared_dummy = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); - let shared_clone = shared_dummy.clone(); + let shared_pool = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let shared_clone = shared_pool.clone(); let (tx, rx): (Sender, Receiver) = mpsc::channel(); let jh0 = thread::spawn(move || { - let mut dummy = shared_dummy.write().unwrap(); + let mut dummy = shared_pool.write().unwrap(); let addr = dummy.add(&DUMMY_DATA).expect("Writing data failed"); tx.send(addr).expect("Sending store address failed"); }); diff --git a/fsrc-core/tests/verification_test.rs b/fsrc-core/tests/verification_test.rs new file mode 100644 index 0000000..22f351e --- /dev/null +++ b/fsrc-core/tests/verification_test.rs @@ -0,0 +1,188 @@ +use fsrc_core::pool::{LocalPool, PoolCfg, PoolProvider, SharedPool}; +use fsrc_core::pus::verification::{ + CrossbeamVerifSender, FailParams, RequestId, VerificationReporterCfg, + VerificationReporterWithSender, +}; +use hashbrown::HashMap; +use spacepackets::ecss::{EcssEnumU16, EcssEnumU8, PusPacket}; +use spacepackets::tc::{PusTc, PusTcSecondaryHeader}; +use spacepackets::tm::PusTm; +use spacepackets::SpHeader; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread; +use std::time::Duration; + +const TEST_APID: u16 = 0x03; +const FIXED_STAMP: [u8; 7] = [0; 7]; +const PACKETS_SENT: u8 = 8; + +/// This test also shows how the verification report could be used in a multi-threaded context, +/// wrapping it into an [Arc] and [Mutex] and then passing it to two threads. +/// +/// - The first thread generates a acceptance, a start, two steps and one completion report +/// - The second generates an acceptance and start success report and a completion failure +/// - The third thread is the verification receiver. In the test case, it verifies the other two +/// threads have sent the correct expected verification reports +#[test] +fn test_shared_reporter() { + let cfg = VerificationReporterCfg::new(TEST_APID, 1, 2, 8); + // Shared pool object to store the verification PUS telemetry + let pool_cfg = PoolCfg::new(vec![(10, 32), (10, 64), (10, 128), (10, 1024)]); + let shared_tm_pool: SharedPool = + Arc::new(RwLock::new(Box::new(LocalPool::new(pool_cfg.clone())))); + let shared_tc_pool_0 = Arc::new(RwLock::new(LocalPool::new(pool_cfg))); + let shared_tc_pool_1 = shared_tc_pool_0.clone(); + let (tx, rx) = crossbeam_channel::bounded(5); + let sender = CrossbeamVerifSender::new(shared_tm_pool.clone(), tx.clone()); + let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( + cfg, + Box::new(sender), + ))); + let reporter_with_sender_1 = reporter_with_sender_0.clone(); + // For test purposes, we retrieve the request ID from the TCs and pass them to the receiver + // tread. + let req_id_0; + let req_id_1; + + let (tx_tc_0, rx_tc_0) = crossbeam_channel::bounded(3); + let (tx_tc_1, rx_tc_1) = crossbeam_channel::bounded(3); + { + let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let mut sph = SpHeader::tc(TEST_APID, 0, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(17, 1); + let pus_tc_0 = PusTc::new(&mut sph, tc_header, None, true); + req_id_0 = RequestId::new(&pus_tc_0); + let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); + pus_tc_0.write_to(&mut buf).unwrap(); + tx_tc_0.send(addr).unwrap(); + let mut sph = SpHeader::tc(TEST_APID, 1, 0).unwrap(); + let tc_header = PusTcSecondaryHeader::new_simple(5, 1); + let pus_tc_1 = PusTc::new(&mut sph, tc_header, None, true); + req_id_1 = RequestId::new(&pus_tc_1); + let (addr, mut buf) = tc_guard.free_element(pus_tc_0.len_packed()).unwrap(); + pus_tc_1.write_to(&mut buf).unwrap(); + tx_tc_1.send(addr).unwrap(); + } + let verif_sender_0 = thread::spawn(move || { + let mut tc_buf: [u8; 1024] = [0; 1024]; + let tc_addr = rx_tc_0 + .recv_timeout(Duration::from_millis(20)) + .expect("Receive timeout"); + let tc_len; + { + let mut tc_guard = shared_tc_pool_0.write().unwrap(); + let pg = tc_guard.read_with_guard(tc_addr); + let buf = pg.read().unwrap(); + tc_len = buf.len(); + tc_buf[0..tc_len].copy_from_slice(buf); + } + let (_tc, _) = PusTc::new_from_raw_slice(&tc_buf[0..tc_len]).unwrap(); + let accepted_token; + { + let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); + let token = mg.add_tc_with_req_id(req_id_0); + accepted_token = mg + .acceptance_success(token, &FIXED_STAMP) + .expect("Acceptance success failed"); + } + // Do some start handling here + let started_token; + { + let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); + started_token = mg + .start_success(accepted_token, &FIXED_STAMP) + .expect("Start success failed"); + // Do some step handling here + mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(0)) + .expect("Start success failed"); + } + // Finish up + let mut mg = reporter_with_sender_0.lock().expect("Locking mutex failed"); + mg.step_success(&started_token, &FIXED_STAMP, EcssEnumU8::new(1)) + .expect("Start success failed"); + mg.completion_success(started_token, &FIXED_STAMP) + .expect("Completion success failed"); + }); + + let verif_sender_1 = thread::spawn(move || { + let mut tc_buf: [u8; 1024] = [0; 1024]; + let tc_addr = rx_tc_1 + .recv_timeout(Duration::from_millis(20)) + .expect("Receive timeout"); + let tc_len; + { + let mut tc_guard = shared_tc_pool_1.write().unwrap(); + let pg = tc_guard.read_with_guard(tc_addr); + let buf = pg.read().unwrap(); + tc_len = buf.len(); + tc_buf[0..tc_len].copy_from_slice(buf); + } + let (tc, _) = PusTc::new_from_raw_slice(&tc_buf[0..tc_len]).unwrap(); + let mut mg = reporter_with_sender_1 + .lock() + .expect("Locking reporter failed"); + let token = mg.add_tc(&tc); + let accepted_token = mg + .acceptance_success(token, &FIXED_STAMP) + .expect("Acceptance success failed"); + let started_token = mg + .start_success(accepted_token, &FIXED_STAMP) + .expect("Start success failed"); + let fail_code = EcssEnumU16::new(2); + let params = FailParams::new(&FIXED_STAMP, &fail_code, None); + mg.completion_failure(started_token, params) + .expect("Completion success failed"); + }); + + let verif_receiver = thread::spawn(move || { + let mut packet_counter = 0; + let mut tm_buf: [u8; 1024] = [0; 1024]; + let mut verif_map = HashMap::new(); + while packet_counter < PACKETS_SENT { + let verif_addr = rx + .recv_timeout(Duration::from_millis(50)) + .expect("Packet reception timeout"); + let tm_len; + { + let mut rg = shared_tm_pool.write().expect("Error locking shared pool"); + let store_guard = rg.read_with_guard(verif_addr); + let slice = store_guard.read().expect("Error reading TM slice"); + tm_len = slice.len(); + tm_buf[0..tm_len].copy_from_slice(slice); + } + let (pus_tm, _) = PusTm::new_from_raw_slice(&tm_buf[0..tm_len], 7) + .expect("Error reading verification TM"); + let req_id = RequestId::from_bytes( + &pus_tm.source_data().expect("Invalid TM source data")[0..RequestId::SIZE_AS_BYTES], + ) + .unwrap(); + if !verif_map.contains_key(&req_id) { + let mut content = Vec::new(); + content.push(pus_tm.subservice()); + verif_map.insert(req_id, content); + } else { + let content = verif_map.get_mut(&req_id).unwrap(); + content.push(pus_tm.subservice()) + } + packet_counter += 1; + } + for (req_id, content) in verif_map { + if req_id == req_id_1 { + assert_eq!(content[0], 1); + assert_eq!(content[1], 3); + assert_eq!(content[2], 8); + } else if req_id == req_id_0 { + assert_eq!(content[0], 1); + assert_eq!(content[1], 3); + assert_eq!(content[2], 5); + assert_eq!(content[3], 5); + assert_eq!(content[4], 7); + } else { + panic!("Unexpected request ID {:?}", req_id); + } + } + }); + verif_sender_0.join().expect("Joining thread 0 failed"); + verif_sender_1.join().expect("Joining thread 1 failed"); + verif_receiver.join().expect("Joining thread 2 failed"); +} diff --git a/fsrc-example/Cargo.toml b/fsrc-example/Cargo.toml index 1ddb71a..8bccc84 100644 --- a/fsrc-example/Cargo.toml +++ b/fsrc-example/Cargo.toml @@ -4,5 +4,12 @@ version = "0.1.0" edition = "2021" authors = ["Robin Mueller "] +[dependencies] +crossbeam-channel = "0.5" +delegate = "0.8" + [dependencies.spacepackets] path = "../spacepackets" + +[dependencies.fsrc-core] +path = "../fsrc-core" diff --git a/fsrc-example/src/bin/client.rs b/fsrc-example/src/bin/client.rs index 5956570..1f56418 100644 --- a/fsrc-example/src/bin/client.rs +++ b/fsrc-example/src/bin/client.rs @@ -1,16 +1,95 @@ +use fsrc_core::pus::verification::RequestId; use fsrc_example::{OBSW_SERVER_ADDR, SERVER_PORT}; +use spacepackets::ecss::PusPacket; use spacepackets::tc::PusTc; +use spacepackets::tm::PusTm; use spacepackets::SpHeader; use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::time::Duration; fn main() { let mut buf = [0; 32]; let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); let mut sph = SpHeader::tc(0x02, 0, 0).unwrap(); let pus_tc = PusTc::new_simple(&mut sph, 17, 1, None, true); - let client = UdpSocket::bind("127.0.0.1:7300").expect("Connecting to UDP server failed"); + let client = UdpSocket::bind("127.0.0.1:7302").expect("Connecting to UDP server failed"); + let tc_req_id = RequestId::new(&pus_tc); + println!( + "Packing and sending PUS ping command TC[17,1] with request ID {}", + tc_req_id + ); let size = pus_tc.write_to(&mut buf).expect("Creating PUS TC failed"); client .send_to(&buf[0..size], &addr) .expect(&*format!("Sending to {:?} failed", addr)); + client + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("Setting read timeout failed"); + loop { + let res = client.recv(&mut buf); + match res { + Ok(_len) => { + let (pus_tm, size) = + PusTm::new_from_raw_slice(&buf, 7).expect("Parsing PUS TM failed"); + if pus_tm.service() == 17 && pus_tm.subservice() == 2 { + println!("Received PUS Ping Reply TM[17,2]") + } else if pus_tm.service() == 1 { + if pus_tm.source_data().is_none() { + println!("Invalid verification TM, no source data"); + } + let src_data = pus_tm.source_data().unwrap(); + if src_data.len() < 4 { + println!("Invalid verification TM source data, less than 4 bytes") + } + let req_id = RequestId::from_bytes(src_data).unwrap(); + if pus_tm.subservice() == 1 { + println!( + "Received TM[1,1] acceptance success for request ID {}", + req_id + ) + } else if pus_tm.subservice() == 2 { + println!( + "Received TM[1,2] acceptance failure for request ID {}", + req_id + ) + } else if pus_tm.subservice() == 3 { + println!("Received TM[1,3] start success for request ID {}", req_id) + } else if pus_tm.subservice() == 4 { + println!("Received TM[1,2] start failure for request ID {}", req_id) + } else if pus_tm.subservice() == 5 { + println!("Received TM[1,5] step success for request ID {}", req_id) + } else if pus_tm.subservice() == 6 { + println!("Received TM[1,6] step failure for request ID {}", req_id) + } else if pus_tm.subservice() == 7 { + println!( + "Received TM[1,7] completion success for request ID {}", + req_id + ) + } else if pus_tm.subservice() == 8 { + println!( + "Received TM[1,8] completion failure for request ID {}", + req_id + ); + } + } else { + println!( + "Received TM[{}, {}] with {} bytes", + pus_tm.service(), + pus_tm.subservice(), + size + ); + } + } + Err(ref e) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::TimedOut => + { + println!("No reply received for 2 seconds"); + break; + } + _ => { + println!("UDP receive error {:?}", res.unwrap_err()); + } + } + } } diff --git a/fsrc-example/src/bin/obsw.rs b/fsrc-example/src/bin/obsw.rs deleted file mode 100644 index 690cd3e..0000000 --- a/fsrc-example/src/bin/obsw.rs +++ /dev/null @@ -1,15 +0,0 @@ -use fsrc_example::{OBSW_SERVER_ADDR, SERVER_PORT}; -use std::net::{IpAddr, SocketAddr, UdpSocket}; - -fn main() { - let mut recv_buf = [0; 1024]; - let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); - let socket = UdpSocket::bind(&addr).expect("Error opening UDP socket"); - loop { - let (num_bytes, src) = socket.recv_from(&mut recv_buf).expect("UDP Receive error"); - println!( - "Received TM with len {num_bytes} from {src}: {:x?}", - &recv_buf[0..num_bytes] - ); - } -} diff --git a/fsrc-example/src/bin/obsw/ccsds.rs b/fsrc-example/src/bin/obsw/ccsds.rs new file mode 100644 index 0000000..b058e4d --- /dev/null +++ b/fsrc-example/src/bin/obsw/ccsds.rs @@ -0,0 +1,37 @@ +use crate::tmtc::PUS_APID; +use fsrc_core::tmtc::{CcsdsPacketHandler, PusDistributor, ReceivesCcsdsTc}; +use spacepackets::{CcsdsPacket, SpHeader}; + +pub struct CcsdsReceiver { + pub pus_handler: PusDistributor<()>, +} + +impl CcsdsPacketHandler for CcsdsReceiver { + type Error = (); + + fn valid_apids(&self) -> &'static [u16] { + &[PUS_APID] + } + + fn handle_known_apid( + &mut self, + sp_header: &SpHeader, + tc_raw: &[u8], + ) -> Result<(), Self::Error> { + if sp_header.apid() == PUS_APID { + self.pus_handler + .pass_ccsds(sp_header, tc_raw) + .expect("Handling PUS packet failed"); + } + Ok(()) + } + + fn handle_unknown_apid( + &mut self, + _sp_header: &SpHeader, + _tc_raw: &[u8], + ) -> Result<(), Self::Error> { + println!("Unknown APID detected"); + Ok(()) + } +} diff --git a/fsrc-example/src/bin/obsw/main.rs b/fsrc-example/src/bin/obsw/main.rs new file mode 100644 index 0000000..00ad086 --- /dev/null +++ b/fsrc-example/src/bin/obsw/main.rs @@ -0,0 +1,73 @@ +mod ccsds; +mod pus; +mod tmtc; + +use crate::tmtc::{core_tmtc_task, TmStore, PUS_APID}; +use fsrc_core::hal::host::udp_server::UdpTcServer; +use fsrc_core::pool::{LocalPool, PoolCfg, SharedPool, StoreAddr}; +use fsrc_core::pus::verification::{ + MpscVerifSender, VerificationReporterCfg, VerificationReporterWithSender, +}; +use fsrc_core::tmtc::CcsdsError; +use fsrc_example::{OBSW_SERVER_ADDR, SERVER_PORT}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{mpsc, Arc, Mutex, RwLock}; +use std::thread; + +struct TmFunnel { + tm_funnel_rx: mpsc::Receiver, + tm_server_tx: mpsc::Sender, +} + +struct UdpTmtcServer { + udp_tc_server: UdpTcServer>, + tm_rx: mpsc::Receiver, + tm_store: SharedPool, +} + +unsafe impl Send for UdpTmtcServer {} + +fn main() { + println!("Running OBSW example"); + let pool_cfg = PoolCfg::new(vec![(8, 32), (4, 64), (2, 128)]); + let tm_pool = LocalPool::new(pool_cfg); + let tm_store: SharedPool = Arc::new(RwLock::new(Box::new(tm_pool))); + let tm_store_helper = TmStore { + pool: tm_store.clone(), + }; + let addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT); + let (tm_funnel_tx, tm_funnel_rx) = mpsc::channel(); + let (tm_server_tx, tm_server_rx) = mpsc::channel(); + let sender = MpscVerifSender::new(tm_store.clone(), tm_funnel_tx.clone()); + let verif_cfg = VerificationReporterCfg::new(PUS_APID, 1, 2, 8); + let reporter_with_sender_0 = Arc::new(Mutex::new(VerificationReporterWithSender::new( + verif_cfg, + Box::new(sender), + ))); + let jh0 = thread::spawn(move || { + core_tmtc_task( + tm_funnel_tx.clone(), + tm_server_rx, + tm_store_helper.clone(), + addr, + reporter_with_sender_0, + ); + }); + + let jh1 = thread::spawn(move || { + let tm_funnel = TmFunnel { + tm_server_tx, + tm_funnel_rx, + }; + loop { + if let Ok(addr) = tm_funnel.tm_funnel_rx.recv() { + tm_funnel + .tm_server_tx + .send(addr) + .expect("Sending TM to server failed"); + } + } + }); + jh0.join().expect("Joining UDP TMTC server thread failed"); + jh1.join().expect("Joining TM Funnel thread failed"); +} diff --git a/fsrc-example/src/bin/obsw/pus.rs b/fsrc-example/src/bin/obsw/pus.rs new file mode 100644 index 0000000..f4c59e3 --- /dev/null +++ b/fsrc-example/src/bin/obsw/pus.rs @@ -0,0 +1,93 @@ +use crate::tmtc::TmStore; +use fsrc_core::pool::StoreAddr; +use fsrc_core::pus::verification::{ + SharedStdVerifReporterWithSender, StateAccepted, VerificationToken, +}; +use fsrc_core::tmtc::tm_helper::PusTmWithCdsShortHelper; +use fsrc_core::tmtc::PusServiceProvider; +use spacepackets::tc::{PusTc, PusTcSecondaryHeaderT}; +use spacepackets::time::{CdsShortTimeProvider, TimeWriter}; +use spacepackets::SpHeader; +use std::sync::mpsc; + +pub struct PusReceiver { + pub tm_helper: PusTmWithCdsShortHelper, + pub tm_tx: mpsc::Sender, + pub tm_store: TmStore, + pub verif_reporter: SharedStdVerifReporterWithSender, + stamper: CdsShortTimeProvider, + time_stamp: [u8; 7], +} + +impl PusReceiver { + pub fn new( + apid: u16, + tm_tx: mpsc::Sender, + tm_store: TmStore, + verif_reporter: SharedStdVerifReporterWithSender, + ) -> Self { + Self { + tm_helper: PusTmWithCdsShortHelper::new(apid), + tm_tx, + tm_store, + verif_reporter, + stamper: CdsShortTimeProvider::default(), + time_stamp: [0; 7], + } + } +} + +impl PusServiceProvider for PusReceiver { + type Error = (); + + fn handle_pus_tc_packet( + &mut self, + service: u8, + _header: &SpHeader, + pus_tc: &PusTc, + ) -> Result<(), Self::Error> { + let mut reporter = self + .verif_reporter + .lock() + .expect("Locking Verification Reporter failed"); + let init_token = reporter.add_tc(pus_tc); + self.stamper + .update_from_now() + .expect("Updating time for time stamp failed"); + self.stamper + .write_to_bytes(&mut self.time_stamp) + .expect("Writing time stamp failed"); + let accepted_token = reporter + .acceptance_success(init_token, &self.time_stamp) + .expect("Acceptance success failure"); + drop(reporter); + if service == 17 { + self.handle_test_service(pus_tc, accepted_token); + } + Ok(()) + } +} + +impl PusReceiver { + fn handle_test_service(&mut self, pus_tc: &PusTc, token: VerificationToken) { + if pus_tc.subservice() == 1 { + println!("Received PUS ping command TC[17,1]"); + println!("Sending ping reply PUS TM[17,2]"); + let ping_reply = self.tm_helper.create_pus_tm_timestamp_now(17, 2, None); + let addr = self.tm_store.add_pus_tm(&ping_reply); + let mut reporter = self + .verif_reporter + .lock() + .expect("Error locking verification reporter"); + let start_token = reporter + .start_success(token, &self.time_stamp) + .expect("Error sending start success"); + self.tm_tx + .send(addr) + .expect("Sending TM to TM funnel failed"); + reporter + .completion_success(start_token, &self.time_stamp) + .expect("Error sending completion success"); + } + } +} diff --git a/fsrc-example/src/bin/obsw/tmtc.rs b/fsrc-example/src/bin/obsw/tmtc.rs new file mode 100644 index 0000000..5a34d1e --- /dev/null +++ b/fsrc-example/src/bin/obsw/tmtc.rs @@ -0,0 +1,110 @@ +use fsrc_core::hal::host::udp_server::{ReceiveResult, UdpTcServer}; +use std::net::SocketAddr; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use crate::ccsds::CcsdsReceiver; +use crate::pus::PusReceiver; +use crate::UdpTmtcServer; +use fsrc_core::pool::{SharedPool, StoreAddr}; +use fsrc_core::pus::verification::SharedStdVerifReporterWithSender; +use fsrc_core::tmtc::{CcsdsDistributor, CcsdsError, PusDistributor}; +use spacepackets::tm::PusTm; + +pub const PUS_APID: u16 = 0x02; + +#[derive(Clone)] +pub struct TmStore { + pub pool: SharedPool, +} + +impl TmStore { + pub fn add_pus_tm(&mut self, pus_tm: &PusTm) -> StoreAddr { + let mut pg = self.pool.write().expect("Error locking TM store"); + let (addr, buf) = pg.free_element(pus_tm.len_packed()).expect("Store error"); + pus_tm + .write_to(buf) + .expect("Writing PUS TM to store failed"); + addr + } +} + +pub fn core_tmtc_task( + tm_creator_tx: mpsc::Sender, + tm_server_rx: mpsc::Receiver, + tm_store_helper: TmStore, + addr: SocketAddr, + verif_reporter: SharedStdVerifReporterWithSender, +) { + let pus_receiver = PusReceiver::new( + PUS_APID, + tm_creator_tx, + tm_store_helper.clone(), + verif_reporter, + ); + let pus_distributor = PusDistributor::new(Box::new(pus_receiver)); + let ccsds_receiver = CcsdsReceiver { + pus_handler: pus_distributor, + }; + let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver)); + let udp_tc_server = UdpTcServer::new(addr, 2048, Box::new(ccsds_distributor)) + .expect("Creating UDP TMTC server failed"); + + let mut udp_tmtc_server = UdpTmtcServer { + udp_tc_server, + tm_rx: tm_server_rx, + tm_store: tm_store_helper.pool.clone(), + }; + loop { + core_tmtc_loop(&mut udp_tmtc_server); + thread::sleep(Duration::from_millis(400)); + } +} + +fn core_tmtc_loop(udp_tmtc_server: &mut UdpTmtcServer) { + while core_tc_handling(udp_tmtc_server) {} + if let Some(recv_addr) = udp_tmtc_server.udp_tc_server.last_sender() { + core_tm_handling(udp_tmtc_server, &recv_addr); + } +} + +fn core_tc_handling(udp_tmtc_server: &mut UdpTmtcServer) -> bool { + match udp_tmtc_server.udp_tc_server.try_recv_tc() { + Ok(_) => true, + Err(e) => match e { + ReceiveResult::ReceiverError(e) => match e { + CcsdsError::PacketError(e) => { + println!("Got packet error: {e:?}"); + true + } + CcsdsError::CustomError(_) => { + println!("Unknown receiver error"); + true + } + }, + ReceiveResult::IoError(e) => { + println!("IO error {e}"); + false + } + ReceiveResult::NothingReceived => false, + }, + } +} + +fn core_tm_handling(udp_tmtc_server: &mut UdpTmtcServer, recv_addr: &SocketAddr) { + while let Ok(addr) = udp_tmtc_server.tm_rx.try_recv() { + let mut store_lock = udp_tmtc_server + .tm_store + .write() + .expect("Locking TM store failed"); + let pg = store_lock.read_with_guard(addr); + let buf = pg.read().expect("Error reading TM pool data"); + println!("Sending TM"); + udp_tmtc_server + .udp_tc_server + .socket + .send_to(buf, recv_addr) + .expect("Sending TM failed"); + } +} diff --git a/fsrc-example/src/bin/test.rs b/fsrc-example/src/bin/test.rs new file mode 100644 index 0000000..a979878 --- /dev/null +++ b/fsrc-example/src/bin/test.rs @@ -0,0 +1,41 @@ +use crossbeam_channel::{bounded, Receiver, Sender}; +use std::thread; + +trait FieldDataProvider: Send { + fn get_data(&self) -> &[u8]; +} + +struct FixedFieldDataWrapper { + data: [u8; 8], +} + +impl FixedFieldDataWrapper { + pub fn from_two_u32(p0: u32, p1: u32) -> Self { + let mut data = [0; 8]; + data[0..4].copy_from_slice(p0.to_be_bytes().as_slice()); + data[4..8].copy_from_slice(p1.to_be_bytes().as_slice()); + Self { data } + } +} + +impl FieldDataProvider for FixedFieldDataWrapper { + fn get_data(&self) -> &[u8] { + self.data.as_slice() + } +} + +type FieldDataTraitObj = Box; + +fn main() { + let (s0, r0): (Sender, Receiver) = bounded(5); + let data_wrapper = FixedFieldDataWrapper::from_two_u32(2, 3); + s0.send(Box::new(data_wrapper)).unwrap(); + let jh0 = thread::spawn(move || { + let data = r0.recv().unwrap(); + let raw = data.get_data(); + println!("Received data {:?}", raw); + }); + let jh1 = thread::spawn(|| {}); + jh0.join().unwrap(); + jh1.join().unwrap(); +} diff --git a/spacepackets b/spacepackets index 3970061..94489da 160000 --- a/spacepackets +++ b/spacepackets @@ -1 +1 @@ -Subproject commit 3970061ca1490658c3694384e57657a1dbe0cadd +Subproject commit 94489da00323dc6caf24e05e240c80fc10b5d8cc