35 Commits

Author SHA1 Message Date
778ae06bbd Merge pull request 'prepare v0.2.0' (#32) from prep_v0.2.0 into main
Reviewed-on: #32
2024-05-02 15:16:46 +02:00
078ac459f8 update README 2024-05-02 15:13:34 +02:00
51fbb2a46b prepare v0.2.0 2024-05-02 15:11:05 +02:00
d37da28efb Merge pull request 'Some bugfixes and docs' (#31) from some-bugfixes into main
Reviewed-on: #31
2024-05-02 15:08:32 +02:00
adc262c57f bump sat-rs 2024-05-02 15:08:01 +02:00
fd025da4b8 some documentation 2024-05-02 13:39:20 +02:00
9282526392 some bugfixes 2024-05-02 13:35:58 +02:00
f4d0a86d7d Merge pull request 'take image metadata generation' (#30) from take-img-metadata-impl into main
Reviewed-on: #30
2024-05-02 13:15:17 +02:00
be261da778 metadata test works 2024-05-02 13:14:51 +02:00
a350e96fc4 Merge remote-tracking branch 'origin/main' into take-img-metadata-impl 2024-05-02 12:37:00 +02:00
ebb58e4fd4 Merge pull request 'bump sat-rs and improve PUS stack' (#29) from bump-satrs-improve-pus-stack into main
Reviewed-on: #29
2024-05-02 12:35:37 +02:00
66a18e08e5 bump sat-rs and improve PUS stack 2024-05-02 12:32:57 +02:00
c4fffdfe2d take image metadata generation 2024-05-02 11:15:06 +02:00
62cc933f88 Merge pull request 'Move Images Commad and Bugfixes' (#28) from move-images-command into main
Reviewed-on: #28
Reviewed-by: lkoester <st167799@stud.uni-stuttgart.de>
2024-05-01 11:14:07 +02:00
6e4abc44d2 this should be it 2024-04-30 19:48:58 +02:00
ecea83fc4b implement move images command
- Also implement various importantr bugfixes for shutdown handling
2024-04-30 19:45:00 +02:00
36a42f95a5 move images command 2024-04-30 18:35:46 +02:00
7770347f4d Merge pull request 'Fixes and optimizations for camera' (#27) from fixes-and-optimizations-camera into main
Reviewed-on: #27
2024-04-30 17:34:27 +02:00
3bad422046 this is stupid 2024-04-30 15:54:34 +02:00
5b1392af4f update cargo.lock 2024-04-30 15:43:13 +02:00
087aed7f78 Merge remote-tracking branch 'origin/main' into fixes-and-optimizations-camera 2024-04-30 13:37:38 +02:00
26ecb6ee33 Merge pull request 'move generic struct' (#26) from move-generic-struct into main
Reviewed-on: #26
2024-04-30 13:33:16 +02:00
3173b18ceb updated sat-rs 2024-04-29 23:46:21 +02:00
e47523a734 move generic struct 2024-04-29 21:18:32 +02:00
9d8104be40 a lot of bugfixes 2024-04-29 18:52:11 +02:00
c9e5b9ffdb Merge pull request 'some more optimizations' (#25) from some-more-optimizations into main
Reviewed-on: #25
2024-04-29 16:47:24 +02:00
a095132d57 some more optimizations 2024-04-29 16:45:08 +02:00
e227fa1d01 Merge pull request 'tweaks and fixes' (#24) from some-tweaks-and-fixes into main
Reviewed-on: #24
2024-04-29 16:23:53 +02:00
ec4b16ed9e tweaks and fixes 2024-04-29 16:21:45 +02:00
d5ea52f9bf Merge pull request 'Use pydantic' (#23) from use-pydantic-for-python-serialization into main
Reviewed-on: #23
Reviewed-by: lkoester <st167799@stud.uni-stuttgart.de>
2024-04-29 15:24:49 +02:00
c184a5f0b3 Merge branch 'main' into use-pydantic-for-python-serialization 2024-04-29 15:24:38 +02:00
2cda1011f7 improvements for deployment script 2024-04-28 13:14:19 +02:00
2c34f46eca update dependencies 2024-04-28 12:47:08 +02:00
b4bf834c39 minor tweak 2024-04-28 12:45:01 +02:00
56b5076230 use pydantic instead of serde in Python 2024-04-28 12:39:34 +02:00
31 changed files with 1317 additions and 901 deletions

View File

@ -6,6 +6,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
# [v0.2.0] 2024-05-02
- Use released `sat-rs` version v0.2.0
## Added
- Taking an image now generates a metadata file.
- Implemented a command to move all camera image related files to the `toGroundLP` folder.
## Fixed
- Various important bugfixes for stop handling and home path handling
# [v0.1.1] 2024-04-26
Various smaller improvements and tweaks.

76
Cargo.lock generated
View File

@ -144,9 +144,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "cc"
version = "1.0.95"
version = "1.0.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b"
checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd"
[[package]]
name = "cfg-if"
@ -249,6 +249,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "delegate"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]]
name = "derive-new"
version = "0.6.0"
@ -313,9 +324,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "2.0.2"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]]
name = "fern"
@ -417,9 +428,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.14.3"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
"allocator-api2",
@ -505,17 +516,11 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.153"
version = "0.2.154"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346"
[[package]]
name = "linux-raw-sys"
@ -629,15 +634,15 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "ops-sat-rs"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"chrono",
"delegate 0.12.0",
"derive-new",
"env_logger",
"fern",
"homedir",
"humantime",
"lazy_static",
"log",
"mio",
"num_enum",
@ -655,15 +660,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.9"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec 1.13.2",
"windows-targets 0.48.5",
"windows-targets 0.52.5",
]
[[package]]
@ -713,11 +718,11 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.4.1"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.5.0",
]
[[package]]
@ -776,15 +781,16 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]]
name = "satrs"
version = "0.2.0-rc.5"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2adc1d9369e3f7e21dabb3181e36c914d1a3f68f4900207a2baa129c2fd5baba"
checksum = "9de5d1f732620b9623289e0e5e3cb480f6bc8d8704f528ca2875dfe200ba90b2"
dependencies = [
"bus",
"cobs",
"crc",
"crossbeam-channel",
"delegate",
"delegate 0.10.0",
"derive-new",
"downcast-rs",
"dyn-clone",
"hashbrown",
@ -836,9 +842,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.198"
version = "1.0.200"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc"
checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f"
dependencies = [
"serde_derive",
]
@ -856,9 +862,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.198"
version = "1.0.200"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9"
checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb"
dependencies = [
"proc-macro2",
"quote",
@ -911,9 +917,9 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.6"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [
"libc",
"windows-sys 0.52.0",
@ -927,7 +933,7 @@ checksum = "fa9f4d7df5fa3bc25ecfc95f1f612fc3d16c566df538d3d3c82db0e523096216"
dependencies = [
"chrono",
"crc",
"delegate",
"delegate 0.10.0",
"num-traits",
"num_enum",
"serde",
@ -1053,7 +1059,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.6",
"winnow 0.6.7",
]
[[package]]
@ -1333,9 +1339,9 @@ dependencies = [
[[package]]
name = "winnow"
version = "0.6.6"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352"
checksum = "14b9415ee827af173ebb3f15f9083df5a122eb93572ec28741fb153356ea2578"
dependencies = [
"memchr",
]

View File

@ -1,6 +1,6 @@
[package]
name = "ops-sat-rs"
version = "0.1.1"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -10,7 +10,7 @@ fern = "0.6"
toml = "0.8"
chrono = "0.4"
log = "0.4"
lazy_static = "1"
delegate = "0.12"
humantime = "2"
strum = { version = "0.26", features = ["derive"] }
thiserror = "1"
@ -24,7 +24,7 @@ socket2 = "0.5"
once_cell = "1.19"
[dependencies.satrs]
version = "0.2.0-rc.5"
version = "0.2.0"
# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
# branch = "main"
features = ["test_util"]

View File

@ -97,3 +97,28 @@ the following configuration:
```
You can run `pyclient.py -T` or `pyclient.py -h` for more information on the client application.
## Knowledge Base
### Home Path Handling
The OPS-SAT software filesystem handling will determine a home path at the start of the software.
This home path is used for various mechanisms inside the OPS-SAT infrastructure.
Currently, there are 3 possible configurations:
1. If the software is built with the `host` feature, the HOME path will be the current path the
software is run at.
2. If the `host` feature is not set and the `/home/exp278` folder exists, that folder will be
the home directory.
3. Otherwise, the default OS home directory will be the home directory.
### Application Shutdown Handling
The application can be stopped by creating a `stop-experiment` file either in the home path
specified in the previous section, or inside the temporary folder. There is also an action command
available to stop the application.
### Camera Handling
TODO

View File

@ -0,0 +1,96 @@
import enum
from typing import List
from spacepackets.ecss import PusTc
from tmtccmd.config import CmdTreeNode
from pydantic import BaseModel
from tmtccmd.tmtc import DefaultPusQueueHelper
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_action_cmd_header
class ActionId(enum.IntEnum):
DEFAULT_SINGLE = 1
BALANCED_SINGLE = 2
DEFAULT_SINGLE_FLATSAT = 3
BALANCED_SNGLE_FLATSAT = 4
CUSTOM_PARAMS = 5
class CameraParameters(BaseModel):
R: int
G: int
B: int
N: int
P: bool
E: int
W: int
def create_camera_node() -> CmdTreeNode:
cam_node = CmdTreeNode("cam", "OPS-SAT IMS1000 batch handler commands")
cam_node.add_child(
CmdTreeNode("default_single", "Default Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode(
"default_single_flatsat",
"Default Single Image Camera Parameters for use on FlatSat",
)
)
cam_node.add_child(
CmdTreeNode(
"balanced_single_flatsat",
"Balanced Single Image Camera Parameters for use on FlatSat",
)
)
cam_node.add_child(
CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
)
return cam_node
def create_cam_cmd(q: DefaultPusQueueHelper, cmd_path: List[str]):
assert len(cmd_path) >= 1
q.add_log_cmd(
"Sending PUS take image action request for command " + cmd_path[0] + " params."
)
data = bytearray()
if cmd_path[0] == "default_single":
data.extend(
make_action_cmd_header(UniqueId.CameraHandler, ActionId.DEFAULT_SINGLE)
)
elif cmd_path[0] == "balanced_single":
data.extend(
make_action_cmd_header(UniqueId.CameraHandler, ActionId.BALANCED_SINGLE)
)
elif cmd_path[0] == "default_single_flatsat":
data.extend(
make_action_cmd_header(
UniqueId.CameraHandler, ActionId.DEFAULT_SINGLE_FLATSAT
)
)
elif cmd_path[0] == "balanced_single_flatsat":
data.extend(
make_action_cmd_header(
UniqueId.CameraHandler, ActionId.BALANCED_SNGLE_FLATSAT
)
)
elif cmd_path[0] == "custom":
data.extend(
make_action_cmd_header(UniqueId.CameraHandler, ActionId.CUSTOM_PARAMS)
)
# TODO: Implement asking params from user.
# params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
# data.extend(params.model_dump_json().encode())
raise NotImplementedError()
else:
raise ValueError("unknown camera action {}", cmd_path[0])
return q.add_pus_tc(
PusTc(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data)
)

View File

@ -1,17 +0,0 @@
import struct
from serde import Model, fields
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_unique_id
class CameraParameters(Model):
R: fields.Int()
G: fields.Int()
B: fields.Int()
N: fields.Int()
P: fields.Bool()
E: fields.Int()
W: fields.Int()
def serialize_for_uplink(self) -> bytearray:
return self.to_json().encode("utf-8")

View File

@ -24,7 +24,6 @@ class UniqueId(enum.IntEnum):
class EventSeverity(enum.IntEnum):
INFO = 0
LOW = 1
MEDIUM = 2

View File

@ -0,0 +1,53 @@
import enum
from typing import List
from spacepackets.ecss import PusTc
from tmtccmd.config import CmdTreeNode
from tmtccmd.tmtc import DefaultPusQueueHelper
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_action_cmd_header
class ActionId(enum.IntEnum):
STOP_EXPERIMENT = 1
DOWNLINK_LOG_FILE = 2
DOWNLINK_IMAGES_BY_MOVING = 3
EXECUTE_SHELL_CMD_BLOCKING = 4
class OpCode:
DOWNLINK_LOGS = "downlink_logs"
DOWNLINK_IMAGES_BY_MOVING = "move_image_files"
def create_controller_node():
controller_node = CmdTreeNode("controller", "Main OBSW Controller")
controller_node.add_child(
CmdTreeNode(OpCode.DOWNLINK_LOGS, "Downlink Logs via toGround folder")
)
controller_node.add_child(
CmdTreeNode(
OpCode.DOWNLINK_IMAGES_BY_MOVING,
"Downlink all image files via the toGroundLP folder",
)
)
return controller_node
def create_ctrl_cmd(q: DefaultPusQueueHelper, cmd_path: List[str]):
assert len(cmd_path) >= 1
data = bytearray()
if cmd_path[0] == OpCode.DOWNLINK_LOGS:
data.extend(
make_action_cmd_header(UniqueId.Controller, ActionId.DOWNLINK_LOG_FILE)
)
elif cmd_path[0] == OpCode.DOWNLINK_IMAGES_BY_MOVING:
data.extend(
make_action_cmd_header(
UniqueId.Controller, ActionId.DOWNLINK_IMAGES_BY_MOVING
)
)
else:
raise ValueError("unknown controller action {}", cmd_path[0])
return q.add_pus_tc(
PusTc(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data)
)

View File

@ -10,12 +10,8 @@ from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
from opssat_tmtc.camera_params import CameraParameters
from opssat_tmtc.common import (
EXPERIMENT_APID,
UniqueId,
make_action_cmd_header,
)
from opssat_tmtc.camera import create_cam_cmd, create_camera_node
from opssat_tmtc.controller import create_controller_node, create_ctrl_cmd
_LOGGER = logging.getLogger(__name__)
@ -37,7 +33,6 @@ def create_set_mode_cmd(
def create_cmd_definition_tree() -> CmdTreeNode:
root_node = CmdTreeNode.root_node()
hk_node = CmdTreeNode("hk", "Housekeeping Node", hide_children_for_print=True)
@ -72,42 +67,8 @@ def create_cmd_definition_tree() -> CmdTreeNode:
)
root_node.add_child(scheduler_node)
action_node = CmdTreeNode("action", "Action Node")
cam_node = CmdTreeNode("take_image", "Take Image with IMS Imager")
cam_node.add_child(
CmdTreeNode("default_single", "Default Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode(
"default_single_flatsat",
"Default Single Image Camera Parameters for use on FlatSat",
)
)
cam_node.add_child(
CmdTreeNode(
"balanced_single_flatsat",
"Balanced Single Image Camera Parameters for use on FlatSat",
)
)
cam_node.add_child(
CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
)
action_node.add_child(cam_node)
controller_node = CmdTreeNode("controller", "Main OBSW Controller")
controller_node.add_child(
CmdTreeNode("downlink_logs", "Downlink Logs via toGround folder")
)
controller_node.add_child(
CmdTreeNode("downlink_last_img", "Downlink last image via toGroundLP folder")
)
action_node.add_child(controller_node)
root_node.add_child(action_node)
root_node.add_child(create_camera_node())
root_node.add_child(create_controller_node())
return root_node
@ -140,45 +101,10 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
)
if cmd_path_list[0] == "acs":
assert len(cmd_path_list) >= 2
if cmd_path_list[0] == "action":
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "take_image":
assert len(cmd_path_list) >= 3
q.add_log_cmd(
"Sending PUS take image action request with "
+ cmd_path_list[2]
+ " params."
)
data = bytearray()
if cmd_path_list[2] == "default_single":
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 1))
if cmd_path_list[2] == "balanced_single":
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 2))
if cmd_path_list[2] == "default_single_flatsat":
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 3))
if cmd_path_list[2] == "balanced_single_flatsat":
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 4))
if cmd_path_list[2] == "custom":
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 5))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
data.extend(params.serialize_for_uplink())
return q.add_pus_tc(
PusTelecommand(
service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
)
)
if cmd_path_list[1] == "controller":
assert len(cmd_path_list) >= 3
data = bytearray()
if cmd_path_list[2] == "downlink_logs":
data.extend(make_action_cmd_header(UniqueId.Controller, 2))
if cmd_path_list[2] == "downlink_last_img":
data.extend(make_action_cmd_header(UniqueId.Controller, 3))
return q.add_pus_tc(
PusTelecommand(
service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
)
)
if cmd_path_list[0] == "cam":
create_cam_cmd(q, cmd_path_list[1:])
if cmd_path_list[0] == "controller":
create_ctrl_cmd(q, cmd_path_list[1:])
def handle_set_mode_cmd(

View File

@ -14,12 +14,14 @@ authors = [
]
dependencies = [
"tmtccmd==8.0.0rc.2",
"serde==0.9.0"
"pydantic==2.7.1"
]
[tool.setuptools.packages]
find = {}
[tool.ruff]
extend-exclude = ["archive"]
[tool.ruff.lint]
ignore = ["E501"]
[tool.ruff.lint.extend-per-file-ignores]

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import socket
import json
import abc
import time
import select
@ -15,7 +16,7 @@ EXP_ID = 278
EXP_APID = 1024 + EXP_ID
EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID)
EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID)
OPSSAT_SERVER_PORT = 4096
OPSSAT_DEFAULT_SERVER_PORT = 4096
TMTC_SERVER_PORT = 4097
LOG_LEVEL = logging.INFO
@ -36,7 +37,17 @@ def main():
)
print("Starting OPS-SAT ground TMTC server")
KILL_SIGNAL.clear()
ops_sat_thread = OpsSatServer()
ops_sat_server_port = OPSSAT_DEFAULT_SERVER_PORT
with open("tmtc_conf.json") as cfg_file:
# Load JSON data
data = json.loads(cfg_file.read())
# Access the value of the tcpip_tcp_server_port key
maybe_ops_sat_server_port = data.get("tcpip_tcp_server_port")
if maybe_ops_sat_server_port is not None:
ops_sat_server_port = maybe_ops_sat_server_port
_LOGGER.info(f"creating OPS-SAT server on port {ops_sat_server_port}")
ops_sat_thread = OpsSatServer(ops_sat_server_port)
ops_sat_thread.start()
tmtc_thread = TmtcServer()
tmtc_thread.start()
@ -142,8 +153,9 @@ class BaseServer(Thread):
class OpsSatServer(BaseServer):
def __init__(self):
super().__init__("[OPS-SAT]", OPSSAT_SERVER_PORT)
def __init__(self, port: int):
self.port = port
super().__init__("[OPS-SAT]", port)
def handle_read_bytestream(self, analysis_deque: deque):
parsed_packets = parse_space_packets(analysis_deque, [EXP_PACKET_ID_TM])

0
pytmtc/tests/__init__.py Normal file
View File

27
pytmtc/tests/test_cam.py Normal file
View File

@ -0,0 +1,27 @@
from unittest import TestCase
from opssat_tmtc.camera_params import CameraParameters
TEST_CAM_PARAMS = CameraParameters(R=8, G=8, B=8, N=1, P=True, E=200, W=1000)
EXPECTED_JSON = '{"R":8,"G":8,"B":8,"N":1,"P":true,"E":200,"W":1000}'
class TestCamInterface(TestCase):
def test_serialization_to_dict(self):
model = TEST_CAM_PARAMS.model_dump()
self.assertEqual(model["R"], 8)
self.assertEqual(model["G"], 8)
self.assertEqual(model["B"], 8)
self.assertEqual(model["N"], 1)
self.assertEqual(model["P"], True)
self.assertEqual(model["E"], 200)
self.assertEqual(model["W"], 1000)
def test_serialization_to_json(self):
json = TEST_CAM_PARAMS.model_dump_json()
self.assertEqual(json, EXPECTED_JSON)
print(json)
def test_deserialization(self):
model_deserialized = CameraParameters.model_validate_json(EXPECTED_JSON)
self.assertEqual(TEST_CAM_PARAMS, model_deserialized)

View File

@ -1,20 +0,0 @@
import struct
from opssat_tmtc.camera_params import CameraParameters
from opssat_tmtc.common import make_unique_id, EXPERIMENT_APID
def test_serde_serialization():
# Example serializatn
data = bytearray(make_unique_id(EXPERIMENT_APID))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
serialized = params.to_json().encode("utf-8")
byte_string = bytearray(struct.pack("!{}s".format(len(serialized)), serialized))
print(byte_string)
print(params.serialize_for_uplink())
data.extend(params.serialize_for_uplink())
print(data)
# Example deserialization
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
deserialized_params = CameraParameters.from_json(data)
print(deserialized_params)

View File

@ -5,11 +5,12 @@ import argparse
import os
import sys
import time
import platform
from typing import Final
# TODO: Should we make this configurable?
BUILDER = "cross"
USE_SSHPASS = False
# This script can easily be adapted to other remote machines, Linux boards and
# remote configurations by tweaking / hardcoding these parameter, which generally are constant
@ -142,8 +143,8 @@ def bld_deploy_run(args):
# ssh_target_ident = f"{args.user}@{args.address}"
ssh_target_ident = "small_flatsat"
sshpass_cmd = ""
# if platform.system() != "Windows":
# sshpass_cmd = f"sshpass {sshpass_args}"
if USE_SSHPASS and platform.system() != "Windows":
sshpass_cmd = f"sshpass {sshpass_args}"
dest_path = f"{args.dest}/{args.app}"
if not args.source:
source_path = f"{os.getcwd()}/target/{args.tc}/{build_folder}/{args.app}"

View File

@ -1,7 +1,7 @@
use lazy_static::lazy_static;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use once_cell::sync::OnceCell;
use satrs::events::{EventU32TypedSev, SeverityInfo};
use satrs::res_code::ResultU16;
use satrs::spacepackets::PacketId;
use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode;
@ -28,8 +28,11 @@ pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPER
pub const SPP_CLIENT_WIRETAPPING_RX: bool = false;
pub const SPP_CLIENT_WIRETAPPING_TX: bool = false;
pub const VERSION: Option<&str> = option_env!("CARGO_PKG_VERSION");
pub static TO_GROUND_FOLDER_DIR: OnceCell<PathBuf> = OnceCell::new();
pub static TO_GROUND_LP_FOLDER_DIR: OnceCell<PathBuf> = OnceCell::new();
pub static HOME_PATH: OnceCell<PathBuf> = OnceCell::new();
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
@ -40,41 +43,42 @@ pub enum CustomPusServiceId {
#[derive(Debug)]
pub enum GroupId {
Tmtc = 0,
Hk = 1,
Mode = 2,
Action = 3,
Controller = 4,
Generic = 0,
Tmtc = 1,
Hk = 2,
Mode = 3,
Action = 4,
Controller = 5,
Camera = 6,
}
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::new(GroupId::Tmtc as u16, 0);
pub const VERSION: Option<&str> = option_env!("CARGO_PKG_VERSION");
lazy_static! {
pub static ref HOME_PATH: PathBuf = {
let mut home_path = PathBuf::new();
pub fn set_up_home_path() -> PathBuf {
let mut home_path = PathBuf::new();
if cfg!(feature = "host") {
home_path = std::env::current_dir()
.expect("getting current dir failed")
.to_path_buf();
} else {
let home_path_default = homedir::get_my_home()
.expect("Getting home dir from OS failed.")
.expect("No home dir found.");
home_path.push(if Path::new(HOME_FOLDER_EXPERIMENT).exists() {
HOME_FOLDER_EXPERIMENT
if Path::new(HOME_FOLDER_EXPERIMENT).exists() {
home_path.push(HOME_FOLDER_EXPERIMENT);
} else {
home_path_default
.to_str()
.expect("Error converting to string.")
});
home_path
};
home_path = home_path_default;
}
}
HOME_PATH
.set(home_path.clone())
.expect("attempting to set once cell twice");
home_path
}
pub fn set_up_low_prio_ground_dir() {
#[cfg(feature = "host")]
let mut to_ground_lp_dir = std::env::current_dir().expect("getting current dir failed");
#[cfg(not(feature = "host"))]
let mut to_ground_lp_dir = HOME_PATH.clone();
pub fn set_up_low_prio_ground_dir(home_path: PathBuf) {
let mut to_ground_lp_dir = home_path.to_path_buf();
to_ground_lp_dir.push(TO_GROUND_LP_FOLDER_NAME);
if !Path::new(&to_ground_lp_dir).exists() {
log::info!(
@ -93,11 +97,8 @@ pub fn set_up_low_prio_ground_dir() {
.expect("attemting to set once cell twice");
}
pub fn set_up_ground_dir() {
#[cfg(feature = "host")]
let mut to_ground_dir = std::env::current_dir().expect("getting current dir failed");
#[cfg(not(feature = "host"))]
let mut to_ground_dir = HOME_PATH.clone();
pub fn set_up_ground_dir(home_path: PathBuf) {
let mut to_ground_dir = home_path.to_path_buf();
to_ground_dir.push(TO_GROUND_FOLDER_NAME);
if !Path::new(&to_ground_dir).exists() {
log::info!("creating to ground directory at {:?}", to_ground_dir);
@ -120,7 +121,7 @@ pub mod cfg_file {
path::{Path, PathBuf},
};
use super::{CONFIG_FILE_NAME, HOME_PATH, TCP_SPP_SERVER_PORT};
use super::{CONFIG_FILE_NAME, TCP_SPP_SERVER_PORT};
pub const SPP_CLIENT_PORT_CFG_KEY: &str = "tcp_spp_server_port";
@ -137,8 +138,8 @@ pub mod cfg_file {
}
}
pub fn create_app_config() -> AppCfg {
let mut cfg_path = HOME_PATH.clone();
pub fn create_app_config(base_path: PathBuf) -> AppCfg {
let mut cfg_path = base_path;
cfg_path.push(CONFIG_FILE_NAME);
let cfg_path_home = cfg_path.as_path();
let relevant_path = if Path::new(CONFIG_FILE_NAME).exists() {
@ -187,9 +188,11 @@ pub mod cfg_file {
}
}
#[resultcode]
pub const GENERIC_FAILED: ResultU16 = ResultU16::new(GroupId::Generic as u8, 1);
pub mod tmtc_err {
use super::*;
use satrs::res_code::ResultU16;
#[resultcode]
pub const INVALID_PUS_SERVICE: ResultU16 = ResultU16::new(GroupId::Tmtc as u8, 0);
@ -220,12 +223,12 @@ pub mod tmtc_err {
UNKNOWN_TARGET_ID_EXT,
ROUTING_ERROR_EXT,
NOT_ENOUGH_APP_DATA_EXT,
REQUEST_TIMEOUT_EXT,
];
}
pub mod action_err {
use super::*;
use satrs::res_code::ResultU16;
#[resultcode]
pub const INVALID_ACTION_ID: ResultU16 = ResultU16::new(GroupId::Action as u8, 0);
@ -235,7 +238,6 @@ pub mod action_err {
pub mod hk_err {
use super::*;
use satrs::res_code::ResultU16;
#[resultcode]
pub const TARGET_ID_MISSING: ResultU16 = ResultU16::new(GroupId::Hk as u8, 0);
@ -256,7 +258,6 @@ pub mod hk_err {
pub mod mode_err {
use super::*;
use satrs::res_code::ResultU16;
#[resultcode]
pub const WRONG_MODE: ResultU16 = ResultU16::new(GroupId::Mode as u8, 0);
@ -264,7 +265,6 @@ pub mod mode_err {
pub mod ctrl_err {
use super::*;
use satrs::res_code::ResultU16;
#[resultcode]
pub const INVALID_CMD_FORMAT: ResultU16 = ResultU16::new(GroupId::Controller as u8, 0);
@ -281,12 +281,56 @@ pub mod ctrl_err {
pub const IMAGE_NOT_FOUND_FOR_COPY: ResultU16 = ResultU16::new(GroupId::Controller as u8, 5);
#[resultcode]
pub const INVALID_LOGFILE_PATH: ResultU16 = ResultU16::new(GroupId::Controller as u8, 6);
#[resultcode]
pub const IO_ERROR: ResultU16 = ResultU16::new(GroupId::Controller as u8, 7);
pub const CTRL_ERR_RESULTS: &[ResultU16Info] = &[
INVALID_CMD_FORMAT_EXT,
SHELL_CMD_IO_ERROR_EXT,
SHELL_CMD_EXECUTION_FAILURE_EXT,
SHELL_CMD_INVALID_FORMAT_EXT,
FILESYSTEM_COPY_ERROR_EXT,
IMAGE_NOT_FOUND_FOR_COPY_EXT,
INVALID_LOGFILE_PATH_EXT,
IO_ERROR_EXT,
];
}
pub mod cam_error {
use super::*;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum CameraError {
#[error("Error taking image: {0}")]
TakeImageError(String),
#[error("error listing image files: {0}")]
ListFileError(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
}
#[resultcode]
pub const TAKE_IMAGE_ERROR: ResultU16 = ResultU16::new(GroupId::Camera as u8, 0);
#[resultcode]
pub const NO_DATA: ResultU16 = ResultU16::new(GroupId::Camera as u8, 1);
#[resultcode]
pub const ACTION_REQ_VARIANT_NOT_IMPL: ResultU16 = ResultU16::new(GroupId::Camera as u8, 2);
#[resultcode]
pub const DESERIALIZE_ERROR: ResultU16 = ResultU16::new(GroupId::Camera as u8, 3);
// TODO: Probably could be in a dedicated modules for these returnvalues.
#[resultcode]
pub const LIST_FILE_ERROR: ResultU16 = ResultU16::new(GroupId::Camera as u8, 4);
#[resultcode]
pub const IO_ERROR: ResultU16 = ResultU16::new(GroupId::Camera as u8, 5);
pub const CAM_ERR_RESULTS: &[ResultU16Info] = &[
TAKE_IMAGE_ERROR_EXT,
NO_DATA_EXT,
ACTION_REQ_VARIANT_NOT_IMPL_EXT,
DESERIALIZE_ERROR_EXT,
LIST_FILE_ERROR_EXT,
IO_ERROR_EXT,
];
}

View File

@ -14,6 +14,7 @@ use satrs::{
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
use std::io;
use std::{
path::{Path, PathBuf},
process::Command,
@ -21,8 +22,8 @@ use std::{
};
use ops_sat_rs::config::ctrl_err::{
FILESYSTEM_COPY_ERROR, IMAGE_NOT_FOUND_FOR_COPY, INVALID_LOGFILE_PATH,
SHELL_CMD_EXECUTION_FAILURE, SHELL_CMD_INVALID_FORMAT, SHELL_CMD_IO_ERROR,
FILESYSTEM_COPY_ERROR, INVALID_LOGFILE_PATH, IO_ERROR, SHELL_CMD_EXECUTION_FAILURE,
SHELL_CMD_INVALID_FORMAT, SHELL_CMD_IO_ERROR,
};
use crate::requests::CompositeRequest;
@ -38,26 +39,30 @@ pub struct ShellCmd<'a> {
pub enum ActionId {
StopExperiment = 1,
DownlinkLogfile = 2,
DownlinkImages = 3,
/// Standard command to download the images made by the camera. It moves all image related
/// files inside the home folder into the toGroundLP (low priority to ground download) folder.
DownlinkImagesByMoving = 3,
ExecuteShellCommandBlocking = 4,
}
#[derive(Debug)]
pub struct ControllerPathCollection {
pub home_path: PathBuf,
pub stop_file_home_path: PathBuf,
pub stop_file_tmp_path: PathBuf,
pub to_ground_dir: PathBuf,
pub to_ground_low_prio_dir: PathBuf,
}
impl Default for ControllerPathCollection {
fn default() -> Self {
let mut home_path_stop_file = PathBuf::new();
home_path_stop_file.push(HOME_PATH.as_path());
impl ControllerPathCollection {
pub fn new(base_path: &Path) -> Self {
let home_path = base_path.to_path_buf();
let mut home_path_stop_file = home_path.clone();
home_path_stop_file.push(STOP_FILE_NAME);
let mut tmp_path_stop_file = temp_dir();
tmp_path_stop_file.push(STOP_FILE_NAME);
Self {
home_path: home_path.clone(),
stop_file_home_path: home_path_stop_file,
stop_file_tmp_path: tmp_path_stop_file,
to_ground_dir: TO_GROUND_FOLDER_DIR
@ -71,6 +76,7 @@ impl Default for ControllerPathCollection {
}
}
}
pub struct ExperimentController {
pub composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
pub action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
@ -140,41 +146,29 @@ impl ExperimentController {
self.handle_shell_command_execution(&requestor, &action_req);
}
ActionId::DownlinkLogfile => self.handle_downlink_logfile(&requestor, &action_req),
// downlink images, default will be the last image, otherwise specified counting down (2 = second to last image, etc.)
ActionId::DownlinkImages => self.handle_downlink_cam_image(&requestor, &action_req),
ActionId::DownlinkImagesByMoving => {
let result = self.handle_downlink_cam_image_by_moving(&requestor, &action_req);
if let Err(e) = result {
send_completion_failure(IO_ERROR, Some(e.to_string().into()));
}
}
}
}
pub fn handle_downlink_cam_image(
pub fn handle_downlink_cam_image_by_moving(
&self,
requestor: &MessageMetadata,
action_req: &ActionRequest,
) {
log::info!("copying images into low priority downlink folder");
let image_path_result = match &action_req.variant {
ActionRequestVariant::VecData(data) => {
let index = data[0];
get_latest_image(index as usize)
}
_ => get_latest_image(0),
};
match image_path_result {
Ok(image_path) => self.handle_file_copy(
requestor,
action_req,
&image_path,
&self.paths.to_ground_dir,
),
Err(e) => {
log::warn!("could not retrieve image path: {}", e);
self.send_completion_failure(
requestor,
action_req,
IMAGE_NOT_FOUND_FOR_COPY,
Some(e.to_string().into()),
);
}
}
) -> io::Result<()> {
log::info!("moving images into low priority downlink folder");
let num_moved_files = move_images_inside_home_dir_to_low_prio_ground_dir(
HOME_PATH.get().unwrap(),
&self.paths.to_ground_low_prio_dir,
)?;
log::info!("moved {} image files", num_moved_files);
// TODO: Trigger event containing the number of moved files?
self.send_completion_success(requestor, action_req);
Ok(())
}
pub fn handle_downlink_logfile(&self, requestor: &MessageMetadata, action_req: &ActionRequest) {
@ -318,7 +312,34 @@ impl ExperimentController {
}
}
pub fn move_images_inside_home_dir_to_low_prio_ground_dir(
home_dir: &Path,
low_prio_target_dir: &Path,
) -> io::Result<u32> {
let mut moved_files = 0;
for dir_entry_result in std::fs::read_dir(home_dir)? {
if let Ok(dir_entry) = &dir_entry_result {
if let Ok(file_type) = dir_entry.file_type() {
if file_type.is_file() {
let path_name = dir_entry.file_name();
let path_name_str = path_name.to_string_lossy();
if path_name_str.contains("img_msec_") {
let mut target_path = PathBuf::new();
target_path.push(low_prio_target_dir);
target_path.push(&path_name);
log::info!("moving file {}", &path_name_str);
std::fs::rename(dir_entry.path(), target_path)?;
moved_files += 1;
}
}
}
}
}
Ok(moved_files)
}
// TODO no idea if this works in any way shape or form
#[allow(dead_code)]
pub fn get_latest_image(index: usize) -> Result<PathBuf, std::io::Error> {
// Get the most recently modified file
let mut png_files = std::fs::read_dir(HOME_FOLDER_EXPERIMENT)?
@ -387,6 +408,7 @@ mod tests {
to_ground_low_prio_dir.push("toGroundLP");
let test_paths = ControllerPathCollection {
home_path: test_tmp_dir.path().to_path_buf(),
stop_file_home_path,
stop_file_tmp_path,
to_ground_dir,

View File

@ -3,7 +3,6 @@ use std::sync::mpsc::{self};
use crate::pus::create_verification_reporter;
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
use satrs::event_man::{EventMessageU32, EventRoutingError};
use satrs::params::WritableToBeBytes;
use satrs::pus::event::EventTmHookProvider;
use satrs::pus::verification::VerificationReporter;
use satrs::request::UniqueApidTargetId;
@ -42,6 +41,7 @@ pub struct PusEventHandler {
tm_sender: mpsc::Sender<PacketAsVec>,
time_provider: CdsTime,
timestamp: [u8; 7],
small_params_buf: [u8; 64],
verif_handler: VerificationReporter,
}
@ -82,6 +82,7 @@ impl PusEventHandler {
pus_event_man_rx,
time_provider: CdsTime::new_with_u16_days(0, 0),
timestamp: [0; 7],
small_params_buf: [0; 64],
verif_handler,
tm_sender,
}
@ -132,19 +133,17 @@ impl PusEventHandler {
// Perform the generation of PUS event packets
match self.pus_event_man_rx.try_recv() {
Ok(event_msg) => {
update_time(&mut self.time_provider, &mut self.timestamp);
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
param.to_vec().expect("failed to convert params to vec")
});
// We use the TM modification hook to set the sender APID for each event.
self.pus_event_tm_creator.reporter.tm_hook.next_apid =
UniqueApidTargetId::from(event_msg.sender_id()).apid;
update_time(&mut self.time_provider, &mut self.timestamp);
self.pus_event_tm_creator
.generate_pus_event_tm_generic(
.generate_pus_event_tm_generic_with_generic_params(
&self.tm_sender,
&self.timestamp,
event_msg.event(),
Some(&param_vec),
&mut self.small_params_buf,
event_msg.params(),
)
.expect("Sending TM as event failed");
}
@ -174,7 +173,7 @@ impl EventHandler {
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
let pus_event_handler = PusEventHandler::new(
tm_sender,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid, 16),
&mut event_manager,
event_request_rx,
);

View File

@ -1,4 +1,3 @@
use crate::pus::action::send_data_reply;
/// Device handler implementation for the IMS-100 Imager used on the OPS-SAT mission.
///
/// from the [OPSSAT Experimenter Wiki](https://opssat1.esoc.esa.int/projects/experimenter-information/wiki/Camera_Introduction):
@ -25,23 +24,28 @@ use crate::pus::action::send_data_reply;
/// v Y
///
/// see also https://opssat1.esoc.esa.int/dmsf/files/6/view
use crate::pus::action::send_data_reply;
use crate::requests::CompositeRequest;
use derive_new::new;
use log::{debug, info};
use log::info;
use num_enum::TryFromPrimitive;
use ops_sat_rs::config::cam_error::{self, CameraError};
use ops_sat_rs::config::GENERIC_FAILED;
use ops_sat_rs::TimeStampHelper;
use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::hk::HkRequest;
use satrs::params::Params;
use satrs::pus::action::{ActionReplyPus, ActionReplyVariant};
use satrs::pus::EcssTmtcError;
use satrs::request::{GenericMessage, MessageMetadata, UniqueApidTargetId};
use satrs::res_code::ResultU16;
use satrs::tmtc::PacketAsVec;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Formatter;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Output};
use std::sync::mpsc;
use std::time::{SystemTime, UNIX_EPOCH};
// const IMS_TESTAPP: &str = "scripts/ims100_testapp";
const IMS_TESTAPP: &str = "ims100_testapp";
const DEFAULT_SINGLE_CAM_PARAMS: CameraPictureParameters = CameraPictureParameters {
@ -88,8 +92,9 @@ const BALANCED_SINGLE_FLATSAT_CAM_PARAMS: CameraPictureParameters = CameraPictur
// TODO ls -l via cfdp
// TODO howto downlink
#[derive(Debug)]
pub enum CameraActionId {
#[derive(Debug, TryFromPrimitive)]
#[repr(u32)]
pub enum ActionId {
DefaultSingle = 1,
BalancedSingle = 2,
DefaultSingleFlatSat = 3,
@ -97,34 +102,9 @@ pub enum CameraActionId {
CustomParameters = 5,
}
impl TryFrom<u32> for CameraActionId {
type Error = ();
fn try_from(value: u32) -> Result<Self, Self::Error> {
match value {
value if value == CameraActionId::DefaultSingle as u32 => {
Ok(CameraActionId::DefaultSingle)
}
value if value == CameraActionId::BalancedSingle as u32 => {
Ok(CameraActionId::BalancedSingle)
}
value if value == CameraActionId::DefaultSingleFlatSat as u32 => {
Ok(CameraActionId::DefaultSingleFlatSat)
}
value if value == CameraActionId::BalancedSingleFlatSat as u32 => {
Ok(CameraActionId::BalancedSingleFlatSat)
}
value if value == CameraActionId::CustomParameters as u32 => {
Ok(CameraActionId::CustomParameters)
}
_ => Err(()),
}
}
}
// TODO what happens if limits are exceded
#[allow(non_snake_case)]
#[derive(Debug, Serialize, Deserialize, new)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, new)]
pub struct CameraPictureParameters {
pub R: u8,
pub G: u8,
@ -135,75 +115,63 @@ pub struct CameraPictureParameters {
pub W: u32, // wait time between pictures in ms, max: 40000
}
#[derive(Debug)]
#[allow(dead_code)]
pub enum CameraError {
TakeImageError,
NoDataSent,
VariantNotImplemented,
DeserializeError,
ListFileError,
IoError(std::io::Error),
EcssTmtcError(EcssTmtcError),
pub trait TakeImageExecutor {
fn take_image(&self, param: &CameraPictureParameters) -> io::Result<(Command, Output)>;
}
impl From<std::io::Error> for CameraError {
fn from(value: std::io::Error) -> Self {
Self::IoError(value)
#[derive(Default)]
pub struct Ims100ImageExecutor {}
pub fn build_take_image_command(param: &CameraPictureParameters) -> Command {
let mut cmd = Command::new(IMS_TESTAPP);
cmd.arg("-R")
.arg(param.R.to_string())
.arg("-G")
.arg(param.G.to_string())
.arg("-B")
.arg(param.B.to_string())
.arg("-c")
.arg("/dev/cam_tty")
.arg("-m")
.arg("/dev/cam_sd")
.arg("-v")
.arg("0")
.arg("-n")
.arg(param.N.to_string());
if param.P {
cmd.arg("-p");
}
cmd.arg("-e")
.arg(param.E.to_string())
.arg("-w")
.arg(param.W.to_string());
cmd
}
impl TakeImageExecutor for Ims100ImageExecutor {
fn take_image(&self, param: &CameraPictureParameters) -> io::Result<(Command, Output)> {
let mut cmd = build_take_image_command(param);
info!("taking image with command: {cmd:?}");
let output = cmd.output()?;
Ok((cmd, output))
}
}
impl From<EcssTmtcError> for CameraError {
fn from(value: EcssTmtcError) -> Self {
Self::EcssTmtcError(value)
}
}
impl fmt::Display for CameraError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CameraError::TakeImageError => {
write!(f, "Error taking image.")
}
CameraError::NoDataSent => {
write!(f, "No data sent.")
}
CameraError::VariantNotImplemented => {
write!(f, "Request variant not implemented.")
}
CameraError::DeserializeError => {
write!(f, "Unable to deserialize parameters.")
}
CameraError::ListFileError => {
write!(f, "Error listing image files.")
}
CameraError::IoError(io_error) => {
write!(f, "{}", io_error)
}
CameraError::EcssTmtcError(ecss_tmtc_error) => {
write!(f, "{}", ecss_tmtc_error)
}
}
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct IMS100BatchHandler {
pub struct Ims100BatchHandler<ImgExecutor: TakeImageExecutor = Ims100ImageExecutor> {
id: UniqueApidTargetId,
// mode_interface: MpscModeLeafInterface,
pub image_executor: ImgExecutor,
pub home_path: PathBuf,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
// hk_reply_sender: mpsc::Sender<GenericMessage<HkReply>>,
tm_tx: mpsc::Sender<PacketAsVec>,
action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
stamp_helper: TimeStampHelper,
}
#[allow(non_snake_case)]
#[allow(dead_code)]
impl IMS100BatchHandler {
impl<ImgExecutor: TakeImageExecutor> Ims100BatchHandler<ImgExecutor> {
pub fn new(
id: UniqueApidTargetId,
image_executor: ImgExecutor,
home_path: &Path,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
tm_tx: mpsc::Sender<PacketAsVec>,
action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
@ -211,6 +179,8 @@ impl IMS100BatchHandler {
) -> Self {
Self {
id,
image_executor,
home_path: home_path.to_path_buf(),
composite_request_rx,
tm_tx,
action_reply_tx,
@ -222,7 +192,6 @@ impl IMS100BatchHandler {
self.stamp_helper.update_from_now();
// Handle requests.
self.handle_composite_requests();
// self.handle_mode_requests();
}
pub fn handle_composite_requests(&mut self) {
@ -233,11 +202,7 @@ impl IMS100BatchHandler {
self.handle_hk_request(&msg.requestor_info, hk_request);
}
CompositeRequest::Action(action_request) => {
if let Err(e) =
self.handle_action_request(&msg.requestor_info, action_request)
{
log::warn!("camera action request IO error: {e}");
}
self.handle_action_request(&msg.requestor_info, action_request);
}
},
Err(e) => match e {
@ -263,76 +228,161 @@ impl IMS100BatchHandler {
&mut self,
requestor_info: &MessageMetadata,
action_request: &ActionRequest,
) -> Result<(), CameraError> {
let param =
match CameraActionId::try_from(action_request.action_id).expect("Invalid action id") {
CameraActionId::DefaultSingle => DEFAULT_SINGLE_CAM_PARAMS,
CameraActionId::BalancedSingle => BALANCED_SINGLE_CAM_PARAMS,
CameraActionId::DefaultSingleFlatSat => DEFAULT_SINGLE_FLATSAT_CAM_PARAMS,
CameraActionId::BalancedSingleFlatSat => BALANCED_SINGLE_FLATSAT_CAM_PARAMS,
CameraActionId::CustomParameters => match &action_request.variant {
ActionRequestVariant::NoData => return Err(CameraError::NoDataSent),
ActionRequestVariant::StoreData(_) => {
// let param = serde_json::from_slice()
// TODO implement non dynamic version
return Err(CameraError::VariantNotImplemented);
}
ActionRequestVariant::VecData(data) => {
let param: serde_json::Result<CameraPictureParameters> =
serde_json::from_slice(data.as_slice());
match param {
Ok(param) => param,
Err(_) => {
return Err(CameraError::DeserializeError);
}
) {
let param = match ActionId::try_from(action_request.action_id).expect("Invalid action id") {
ActionId::DefaultSingle => DEFAULT_SINGLE_CAM_PARAMS,
ActionId::BalancedSingle => BALANCED_SINGLE_CAM_PARAMS,
ActionId::DefaultSingleFlatSat => DEFAULT_SINGLE_FLATSAT_CAM_PARAMS,
ActionId::BalancedSingleFlatSat => BALANCED_SINGLE_FLATSAT_CAM_PARAMS,
ActionId::CustomParameters => match &action_request.variant {
ActionRequestVariant::NoData => {
self.send_completion_failure(
requestor_info,
action_request,
cam_error::NO_DATA,
None,
);
return;
}
ActionRequestVariant::VecData(data) => {
let param: serde_json::Result<CameraPictureParameters> =
serde_json::from_slice(data.as_slice());
match param {
Ok(param) => param,
Err(e) => {
self.send_completion_failure(
requestor_info,
action_request,
cam_error::DESERIALIZE_ERROR,
Some(e.to_string().into()),
);
return;
}
}
_ => return Err(CameraError::VariantNotImplemented),
},
};
let output = self.take_picture(param)?;
info!("Sending action reply!");
send_data_reply(self.id, output.stdout, &self.stamp_helper, &self.tm_tx)?;
self.action_reply_tx
.send(GenericMessage::new(
*requestor_info,
ActionReplyPus::new(action_request.action_id, ActionReplyVariant::Completed),
))
.unwrap();
}
_ => {
self.send_completion_failure(
requestor_info,
action_request,
cam_error::ACTION_REQ_VARIANT_NOT_IMPL,
None,
);
return;
}
},
};
match self.take_picture(&param) {
Ok((cmd, ref output)) => {
self.send_completion_success(requestor_info, action_request);
if let Err(e) =
send_data_reply(self.id, &output.stdout, &self.stamp_helper, &self.tm_tx)
{
log::error!("sending data reply unexpectedly failed: {e}");
}
if let Err(e) = self.create_metadata_file(cmd, &param) {
// TODO: Generate event?
log::error!("issue creating metadata file: {e}");
}
}
Err(e) => match e {
CameraError::TakeImageError(ref err_str) => {
self.send_completion_failure(
requestor_info,
action_request,
cam_error::TAKE_IMAGE_ERROR,
Some(err_str.to_string().into()),
);
}
CameraError::IoError(ref e) => {
self.send_completion_failure(
requestor_info,
action_request,
cam_error::IO_ERROR,
Some(e.to_string().into()),
);
}
_ => {
log::warn!("unexpected error: {:?}", e);
self.send_completion_failure(
requestor_info,
action_request,
GENERIC_FAILED,
None,
);
}
},
}
}
pub fn create_metadata_file(
&mut self,
cmd: Command,
param: &CameraPictureParameters,
) -> io::Result<()> {
let now = SystemTime::now();
let unix_timestamp = now.duration_since(UNIX_EPOCH);
if unix_timestamp.is_err() {
log::error!("failed to get unix timestamp, time went backwards?");
return Ok(());
}
let unix_timestamp = unix_timestamp.unwrap().as_millis();
let mut metadata_path = self.home_path.clone();
metadata_path.push(format!("img_msec_{}.txt", unix_timestamp));
let mut file = std::fs::File::create(metadata_path)?;
writeln!(file, "time: {}", humantime::format_rfc3339_seconds(now))?;
writeln!(file, "cmd params: {:?}", param)?;
writeln!(file, "cmd: {:?}", cmd)?;
Ok(())
}
pub fn take_picture(&mut self, param: CameraPictureParameters) -> Result<Output, CameraError> {
info!("Taking image!");
let mut cmd = Command::new(IMS_TESTAPP);
cmd.arg("-R")
.arg(&param.R.to_string())
.arg("-G")
.arg(&param.G.to_string())
.arg("-B")
.arg(&param.B.to_string())
.arg("-c")
.arg("/dev/cam_tty")
.arg("-m")
.arg("/dev/cam_sd")
.arg("-v")
.arg("0")
.arg("-n")
.arg(&param.N.to_string());
if param.P {
cmd.arg("-p");
pub fn send_completion_success(&self, requestor: &MessageMetadata, action_req: &ActionRequest) {
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
*requestor,
action_req.action_id,
ActionReplyVariant::Completed,
));
if result.is_err() {
log::error!("sending action reply failed");
}
cmd.arg("-e")
.arg(&param.E.to_string())
.arg("-w")
.arg(&param.W.to_string());
let output = cmd.output()?;
debug!("Imager Output: {}", String::from_utf8_lossy(&output.stdout));
Ok(output)
}
pub fn send_completion_failure(
&self,
requestor: &MessageMetadata,
action_req: &ActionRequest,
error_code: ResultU16,
params: Option<Params>,
) {
let result = self.action_reply_tx.send(GenericMessage::new_action_reply(
*requestor,
action_req.action_id,
ActionReplyVariant::CompletionFailed { error_code, params },
));
if result.is_err() {
log::error!("sending action reply failed");
}
}
pub fn take_picture(
&mut self,
param: &CameraPictureParameters,
) -> Result<(Command, Output), CameraError> {
let (cmd, output) = self.image_executor.take_image(param)?;
info!("imager cmd status: {}", &output.status);
info!("imager output: {}", String::from_utf8_lossy(&output.stdout));
let mut error_string = String::new();
if !output.stderr.is_empty() {
error_string = String::from_utf8_lossy(&output.stderr).to_string();
log::warn!("imager error: {}", error_string);
}
if !output.status.success() {
return Err(CameraError::TakeImageError(error_string.to_string()));
}
Ok((cmd, output))
}
#[allow(dead_code)]
pub fn list_current_images(&self) -> Result<Vec<String>, CameraError> {
let output = Command::new("ls").arg("-l").arg("*.png").output()?;
@ -341,91 +391,127 @@ impl IMS100BatchHandler {
let files: Vec<String> = output_str.lines().map(|s| s.to_string()).collect();
Ok(files)
} else {
Err(CameraError::ListFileError)
Err(CameraError::ListFileError(
String::from_utf8_lossy(&output.stderr).to_string(),
))
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn take_picture_from_str(
&mut self,
R: &str,
G: &str,
B: &str,
N: &str,
P: &str,
E: &str,
W: &str,
) -> Result<(), CameraError> {
let mut cmd = Command::new("ims100_testapp");
cmd.arg("-R")
.arg(R)
.arg("-G")
.arg(G)
.arg("-B")
.arg(B)
.arg("-c")
.arg("/dev/cam_tty")
.arg("-m")
.arg("/dev/cam_sd")
.arg("-v")
.arg("0")
.arg("-n")
.arg(N)
.arg(P)
.arg("-e")
.arg(E)
.arg("-w")
.arg(W);
let output = cmd.output()?;
debug!("{}", String::from_utf8_lossy(&output.stdout));
Ok(())
impl Ims100BatchHandler {
pub fn new_with_default_img_executor(
id: UniqueApidTargetId,
home_path: &Path,
composite_request_rx: mpsc::Receiver<GenericMessage<CompositeRequest>>,
tm_tx: mpsc::Sender<PacketAsVec>,
action_reply_tx: mpsc::Sender<GenericMessage<ActionReplyPus>>,
stamp_helper: TimeStampHelper,
) -> Self {
Self::new(
id,
Ims100ImageExecutor::default(),
home_path,
composite_request_rx,
tm_tx,
action_reply_tx,
stamp_helper,
)
}
}
#[cfg(test)]
mod tests {
use crate::handlers::camera::{
CameraActionId, CameraPictureParameters, IMS100BatchHandler,
DEFAULT_SINGLE_FLATSAT_CAM_PARAMS,
ActionId, CameraPictureParameters, Ims100BatchHandler, DEFAULT_SINGLE_FLATSAT_CAM_PARAMS,
};
use crate::requests::CompositeRequest;
use ops_sat_rs::config::components::CAMERA_HANDLER;
use ops_sat_rs::TimeStampHelper;
use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::pus::action::ActionReplyPus;
use satrs::pus::action::{ActionReplyPus, ActionReplyVariant};
use satrs::request::{GenericMessage, MessageMetadata};
use satrs::tmtc::PacketAsVec;
use satrs::ComponentId;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::unix::process::ExitStatusExt;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use tempfile::{tempdir, TempDir};
fn create_handler() -> (
IMS100BatchHandler,
Sender<GenericMessage<CompositeRequest>>,
Receiver<PacketAsVec>,
Receiver<GenericMessage<ActionReplyPus>>,
) {
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel();
let (action_reply_tx, action_reply_rx) = mpsc::channel();
let time_helper = TimeStampHelper::default();
let cam_handler: IMS100BatchHandler = IMS100BatchHandler::new(
CAMERA_HANDLER,
composite_request_rx,
tm_tx,
action_reply_tx,
time_helper,
);
(cam_handler, composite_request_tx, tm_rx, action_reply_rx)
use super::{build_take_image_command, TakeImageExecutor};
const REQUESTOR_ID: ComponentId = 1;
#[derive(Default)]
struct Ims100TestImageExecutor {
pub called_with_params: RefCell<VecDeque<CameraPictureParameters>>,
}
impl TakeImageExecutor for Ims100TestImageExecutor {
fn take_image(
&self,
param: &CameraPictureParameters,
) -> std::io::Result<(std::process::Command, std::process::Output)> {
let mut param_deque = self.called_with_params.borrow_mut();
param_deque.push_back(param.clone());
// We fake the test output, with no way to execute the actual command.
let output = std::process::Output {
status: std::process::ExitStatus::from_raw(0),
stdout: Vec::new(),
stderr: Vec::new(),
};
// We could generate the files as they are generated by the real batch handler.. But
// I think it's okay to verify that the function is called with the correct parameters
// and the metadata file is created for now.
Ok((build_take_image_command(param), output))
}
}
#[allow(dead_code)]
struct Ims100Testbench {
pub handler: Ims100BatchHandler<Ims100TestImageExecutor>,
pub tmp_home_dir: TempDir,
pub composite_req_tx: mpsc::Sender<GenericMessage<CompositeRequest>>,
pub tm_receiver: mpsc::Receiver<PacketAsVec>,
pub action_reply_rx: mpsc::Receiver<GenericMessage<ActionReplyPus>>,
}
impl Default for Ims100Testbench {
fn default() -> Self {
let tmp_home_dir = tempdir().expect("errror creating temp directory");
let (composite_request_tx, composite_request_rx) = mpsc::channel();
let (tm_tx, tm_rx) = mpsc::channel();
let (action_reply_tx, action_reply_rx) = mpsc::channel();
let time_helper = TimeStampHelper::default();
let cam_handler = Ims100BatchHandler::new(
CAMERA_HANDLER,
Ims100TestImageExecutor::default(),
tmp_home_dir.path(),
composite_request_rx,
tm_tx,
action_reply_tx,
time_helper,
);
// Even though we set the temporary home directory into HOME_PATH, we still need to
// cache the TempDir, so it is not dropped.
Ims100Testbench {
handler: cam_handler,
tmp_home_dir,
composite_req_tx: composite_request_tx,
tm_receiver: tm_rx,
action_reply_rx,
}
}
}
#[test]
fn command_line_execution() {
let (mut cam_handler, req_tx, tm_rx, action_reply_rx) = create_handler();
cam_handler
.take_picture(DEFAULT_SINGLE_FLATSAT_CAM_PARAMS)
let mut testbench = Ims100Testbench::default();
testbench
.handler
.take_picture(&DEFAULT_SINGLE_FLATSAT_CAM_PARAMS)
.unwrap();
}
@ -438,34 +524,94 @@ mod tests {
}
#[test]
fn test_action_req() {
let (mut cam_handler, req_tx, tm_rx, action_reply_rx) = create_handler();
fn test_take_image_action_req() {
let request_id = 5;
let mut testbench = Ims100Testbench::default();
let data = serde_json::to_string(&DEFAULT_SINGLE_FLATSAT_CAM_PARAMS).unwrap();
let req = ActionRequest::new(
CameraActionId::CustomParameters as u32,
ActionId::CustomParameters as u32,
ActionRequestVariant::VecData(data.as_bytes().to_vec()),
);
cam_handler
.handle_action_request(&MessageMetadata::new(1, 1), &req)
.unwrap();
testbench
.handler
.handle_action_request(&MessageMetadata::new(request_id, REQUESTOR_ID), &req);
let action_reply = testbench
.action_reply_rx
.try_recv()
.expect("expected action reply");
assert!(matches!(
action_reply.message.variant,
ActionReplyVariant::Completed
));
assert_eq!(action_reply.request_id(), request_id);
assert_eq!(action_reply.sender_id(), REQUESTOR_ID);
let mut image_executor = testbench
.handler
.image_executor
.called_with_params
.borrow_mut();
let called_params = image_executor.pop_front().expect("expected called params");
assert_eq!(called_params, DEFAULT_SINGLE_FLATSAT_CAM_PARAMS);
let mut detected_metadata_file = false;
for dir_entry_result in std::fs::read_dir(&testbench.handler.home_path)
.unwrap_or_else(|_| panic!("can not read {:?}", testbench.handler.home_path.as_path()))
{
if let Ok(dir_entry) = &dir_entry_result {
if let Ok(file_type) = dir_entry.file_type() {
if file_type.is_file() {
let path_name = dir_entry.file_name();
let path_name_str = path_name.to_string_lossy();
if path_name_str.contains("img_msec_") {
let file = File::open(dir_entry.path()).expect("file not found");
let buf_reader = BufReader::new(file);
for (idx, line) in buf_reader.lines().enumerate() {
let line = line.expect("line is not proper string");
if idx == 0 {
assert!(line.contains("time:"));
// Tricky to check, would have to mock this.. I think it's okay
// for now.
}
if idx == 1 {
assert!(line.contains("cmd params:"));
assert!(line.contains(&format!(
"{:?}",
&DEFAULT_SINGLE_FLATSAT_CAM_PARAMS
)));
}
if idx == 2 {
assert!(line.contains("cmd:"));
let cmd = build_take_image_command(
&DEFAULT_SINGLE_FLATSAT_CAM_PARAMS,
);
let cmd_str = format!("{:?}", cmd);
assert!(line.contains(&cmd_str));
}
}
detected_metadata_file = true;
}
}
}
}
}
assert!(detected_metadata_file, "no metadata file was generated");
}
#[test]
fn test_action_req_channel() {
let (mut cam_handler, req_tx, tm_rx, action_reply_rx) = create_handler();
let mut testbench = Ims100Testbench::default();
let data = serde_json::to_string(&DEFAULT_SINGLE_FLATSAT_CAM_PARAMS).unwrap();
let req = ActionRequest::new(
CameraActionId::CustomParameters as u32,
ActionId::CustomParameters as u32,
ActionRequestVariant::VecData(data.as_bytes().to_vec()),
);
let req = CompositeRequest::Action(req);
req_tx
testbench
.composite_req_tx
.send(GenericMessage::new(MessageMetadata::new(1, 1), req))
.unwrap();
cam_handler.periodic_operation();
testbench.handler.periodic_operation();
}
}

View File

@ -25,9 +25,10 @@ pub enum ClientError {
Io(#[from] io::Error),
}
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum ClientResult {
Ok,
AttemptedReconnection,
ConnectionLost,
}
@ -77,7 +78,6 @@ impl TcpSppClientCommon {
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
println!("god fuckikng damn it");
log::error!("TM sender to TCP client has disconnected");
break;
}
@ -91,6 +91,7 @@ impl TcpSppClientCommon {
pub struct TcpSppClientStd {
common: TcpSppClientCommon,
read_and_idle_delay: Duration,
reconnect_flag: bool,
// Optional to allow periodic reconnection attempts on the TCP server.
stream: Option<StdTcpStream>,
}
@ -114,6 +115,7 @@ impl TcpSppClientStd {
tc_source_tx,
validator: SimpleSpValidator::new(TcpComponent::Client, valid_ids.to_vec()),
},
reconnect_flag: false,
read_and_idle_delay: read_timeout,
stream: None,
};
@ -143,17 +145,33 @@ impl TcpSppClientStd {
}
pub fn operation(&mut self) -> Result<ClientResult, ClientError> {
let result = self.operation_inner();
if let Ok(client_result) = &result {
if *client_result != ClientResult::Ok {
std::thread::sleep(self.read_and_idle_delay);
}
}
result
}
fn operation_inner(&mut self) -> Result<ClientResult, ClientError> {
if let Some(client) = &mut self.stream {
// Write TM first before blocking on the read call.
self.common.write_to_server(client)?;
match client.read(&mut self.common.read_buf) {
// Not sure if this can happen or this is actually an error condition..
Ok(0) => {
log::info!("server closed connection");
// To avoid spam.
if !self.reconnect_flag {
log::info!("server closed connection");
}
self.stream = None;
return Ok(ClientResult::ConnectionLost);
}
Ok(read_bytes) => self.common.handle_read_bytstream(read_bytes)?,
Ok(read_bytes) => {
self.reconnect_flag = false;
self.common.handle_read_bytstream(read_bytes)?;
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut
{
@ -170,10 +188,14 @@ impl TcpSppClientStd {
}
} else {
if self.attempt_connect(false)? {
log::info!("reconnected to server succesfully");
// To avoid spam.
if !self.reconnect_flag {
log::info!("reconnected to server succesfully");
}
self.reconnect_flag = true;
return self.operation();
}
std::thread::sleep(self.read_and_idle_delay);
return Ok(ClientResult::AttemptedReconnection);
}
Ok(ClientResult::Ok)

View File

@ -3,11 +3,10 @@ use std::sync::mpsc;
use log::{info, warn};
use satrs::hal::std::udp_server::{ReceiveResult, UdpTcServer};
use satrs::pus::HandlingStatus;
use satrs::queue::GenericSendError;
use satrs::tmtc::PacketAsVec;
use crate::pus::HandlingStatus;
pub trait UdpTmHandler {
fn send_tm_to_udp_client(&mut self, socket: &UdpSocket, recv_addr: &SocketAddr);
}
@ -126,7 +125,7 @@ mod tests {
let (tx, rx) = mpsc::channel();
let udp_tc_server = UdpTcServer::new(UDP_SERVER_ID, sock_addr, 2048, tx).unwrap();
let tm_handler = TestTmHandler::default();
let tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let _tm_handler_calls = tm_handler.addrs_to_send_to.clone();
let mut udp_dyn_server = UdpTmtcServer {
udp_tc_server,
tm_handler,

View File

@ -1,7 +1,6 @@
use std::{
env::temp_dir,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::{atomic::AtomicBool, mpsc, Arc},
thread,
time::Duration,
@ -12,7 +11,7 @@ use ops_sat_rs::config::{
cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
pool::create_sched_tc_pool,
set_up_ground_dir, set_up_low_prio_ground_dir,
set_up_ground_dir, set_up_home_path, set_up_low_prio_ground_dir,
tasks::{FREQ_MS_CAMERA_HANDLING, FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
HOME_PATH, STOP_FILE_NAME, VALID_PACKET_ID_LIST, VERSION,
};
@ -33,7 +32,7 @@ use crate::{
PusTcDistributor, PusTcMpscRouter,
},
};
use crate::{handlers::camera::IMS100BatchHandler, pus::event::create_event_service};
use crate::{handlers::camera::Ims100BatchHandler, pus::event::create_event_service};
use crate::{
interface::tcp_server::{SyncTcpTmSource, TcpTask},
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
@ -58,10 +57,12 @@ fn main() {
let version_str = VERSION.unwrap_or("?");
println!("OPS-SAT Rust Experiment OBSW v{}", version_str);
setup_logger().expect("setting up logging with fern failed");
set_up_low_prio_ground_dir();
set_up_ground_dir();
let app_cfg = create_app_config();
let home_path = set_up_home_path();
set_up_low_prio_ground_dir(home_path.clone());
set_up_ground_dir(home_path.clone());
let app_cfg = create_app_config(home_path.clone());
info!("App Configuration: {:?}", app_cfg);
let stop_signal = Arc::new(AtomicBool::new(false));
@ -154,14 +155,20 @@ fn main() {
);
let sock_addr = SocketAddr::new(IpAddr::V4(OBSW_SERVER_ADDR), SERVER_PORT);
let udp_tc_server = UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone())
.expect("creating UDP TMTC server failed");
let mut udp_tmtc_server = UdpTmtcServer {
udp_tc_server,
tm_handler: DynamicUdpTmHandler {
tm_rx: tm_tcp_server_rx,
},
};
let udp_tc_server_result =
UdpTcServer::new(UDP_SERVER.id(), sock_addr, 2048, tc_source_tx.clone());
if udp_tc_server_result.is_err() {
log::error!("UDP server creation failed");
}
let mut opt_udp_tmtc_server = None;
if let Ok(udp_tc_server) = udp_tc_server_result {
opt_udp_tmtc_server = Some(UdpTmtcServer {
udp_tc_server,
tm_handler: DynamicUdpTmHandler {
tm_rx: tm_tcp_server_rx,
},
});
}
let tcp_server_cfg = ServerConfig::new(
TCP_SERVER.id(),
@ -188,8 +195,7 @@ fn main() {
stop_signal.clone(),
);
let mut home_path_stop_file = PathBuf::new();
home_path_stop_file.push(HOME_PATH.as_path());
let mut home_path_stop_file = home_path.clone();
home_path_stop_file.push(STOP_FILE_NAME);
let mut tmp_path_stop_file = temp_dir();
tmp_path_stop_file.push(STOP_FILE_NAME);
@ -197,7 +203,7 @@ fn main() {
controller_composite_rx,
pus_action_reply_tx.clone(),
stop_signal.clone(),
ControllerPathCollection::default(),
ControllerPathCollection::new(&home_path),
);
let mut tcp_spp_client = TcpSppClientStd::new(
@ -211,8 +217,9 @@ fn main() {
.expect("creating TCP SPP client failed");
let timestamp_helper = TimeStampHelper::default();
let mut camera_handler: IMS100BatchHandler = IMS100BatchHandler::new(
let mut camera_handler: Ims100BatchHandler = Ims100BatchHandler::new_with_default_img_executor(
CAMERA_HANDLER,
HOME_PATH.get().unwrap(),
camera_composite_rx,
tm_funnel_tx.clone(),
pus_action_reply_tx.clone(),
@ -243,7 +250,9 @@ fn main() {
.spawn(move || {
info!("Running UDP server on port {SERVER_PORT}");
loop {
udp_tmtc_server.periodic_operation();
if let Some(ref mut udp_tmtc_server) = opt_udp_tmtc_server {
udp_tmtc_server.periodic_operation();
}
tmtc_task.periodic_operation();
if tmtc_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
@ -280,9 +289,11 @@ fn main() {
.spawn(move || {
info!("Running TCP SPP client");
loop {
let result = tcp_spp_client.operation();
if let Err(e) = result {
log::error!("TCP SPP client error: {}", e);
match tcp_spp_client.operation() {
Ok(_result) => (),
Err(e) => {
log::error!("TCP SPP client error: {}", e);
}
}
if tcp_client_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
@ -305,10 +316,14 @@ fn main() {
.unwrap();
info!("Starting event handling task");
let event_stop_signal = stop_signal.clone();
let jh_event_handling = thread::Builder::new()
.name("sat-rs events".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
if event_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
})
.unwrap();

View File

@ -1,25 +1,25 @@
use log::{debug, error, warn};
use log::warn;
use ops_sat_rs::config::components::PUS_ACTION_SERVICE;
use ops_sat_rs::config::tmtc_err;
use ops_sat_rs::TimeStampHelper;
use satrs::action::{ActionRequest, ActionRequestVariant};
use satrs::params::WritableToBeBytes;
use satrs::pus::action::{
ActionReplyPus, ActionReplyVariant, ActivePusActionRequestStd, DefaultActiveActionRequestMap,
};
use satrs::pus::verification::{
FailParams, FailParamsWithStep, TcStateAccepted, TcStateStarted, VerificationReporter,
handle_completion_failure_with_generic_params, handle_step_failure_with_generic_params,
FailParamHelper, FailParams, TcStateAccepted, TcStateStarted, VerificationReporter,
VerificationReportingProvider, VerificationToken,
};
use satrs::pus::{
ActiveRequestProvider, EcssTcAndToken, EcssTcInVecConverter, EcssTmSender, EcssTmtcError,
GenericConversionError, PusPacketHandlerResult, PusReplyHandler, PusServiceHelper,
PusTcToRequestConverter, PusTmVariant,
GenericConversionError, HandlingStatus, PusPacketHandlingError, PusReplyHandler,
PusServiceHelper, PusTcToRequestConverter, PusTmVariant,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::tm::{PusTmCreator, PusTmSecondaryHeader};
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket};
use satrs::spacepackets::ecss::{EcssEnumU16, PusPacket, PusServiceId};
use satrs::spacepackets::SpHeader;
use satrs::tmtc::PacketAsVec;
use std::sync::mpsc;
@ -28,8 +28,8 @@ use std::time::Duration;
use crate::requests::GenericRequestRouter;
use super::{
create_verification_reporter, generic_pus_request_timeout_handler, HandlingStatus,
PusTargetedRequestService, TargetedPusService,
create_verification_reporter, generic_pus_request_timeout_handler, PusTargetedRequestService,
TargetedPusService,
};
pub const DATA_REPLY: u8 = 130;
@ -64,7 +64,7 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPus> for ActionReplyH
active_request: &ActivePusActionRequestStd,
tm_sender: &(impl EcssTmSender + ?Sized),
verification_handler: &impl VerificationReportingProvider,
time_stamp: &[u8],
timestamp: &[u8],
) -> Result<bool, Self::Error> {
let verif_token: VerificationToken<TcStateStarted> = active_request
.token()
@ -72,15 +72,23 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPus> for ActionReplyH
.expect("invalid token state");
let remove_entry = match &reply.message.variant {
ActionReplyVariant::CompletionFailed { error_code, params } => {
let mut fail_data_len = 0;
if let Some(params) = params {
fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
}
verification_handler.completion_failure(
let error_propagated = handle_completion_failure_with_generic_params(
tm_sender,
verif_token,
FailParams::new(time_stamp, error_code, &self.fail_data_buf[..fail_data_len]),
verification_handler,
FailParamHelper {
error_code,
params: params.as_ref(),
timestamp,
small_data_buf: &mut self.fail_data_buf,
},
)?;
if !error_propagated {
log::warn!(
"error params for completion failure were not propated: {:?}",
params.as_ref()
);
}
true
}
ActionReplyVariant::StepFailed {
@ -88,31 +96,35 @@ impl PusReplyHandler<ActivePusActionRequestStd, ActionReplyPus> for ActionReplyH
step,
params,
} => {
let mut fail_data_len = 0;
if let Some(params) = params {
fail_data_len = params.write_to_be_bytes(&mut self.fail_data_buf)?;
}
verification_handler.step_failure(
let error_propagated = handle_step_failure_with_generic_params(
tm_sender,
verif_token,
FailParamsWithStep::new(
time_stamp,
&EcssEnumU16::new(*step),
verification_handler,
FailParamHelper {
error_code,
&self.fail_data_buf[..fail_data_len],
),
params: params.as_ref(),
timestamp,
small_data_buf: &mut self.fail_data_buf,
},
&EcssEnumU16::new(*step),
)?;
if !error_propagated {
log::warn!(
"error params for completion failure were not propated: {:?}",
params.as_ref()
);
}
true
}
ActionReplyVariant::Completed => {
verification_handler.completion_success(tm_sender, verif_token, time_stamp)?;
verification_handler.completion_success(tm_sender, verif_token, timestamp)?;
true
}
ActionReplyVariant::StepSuccess { step } => {
verification_handler.step_success(
tm_sender,
&verif_token,
time_stamp,
timestamp,
EcssEnumU16::new(*step),
)?;
false
@ -209,7 +221,7 @@ pub fn create_action_service(
PUS_ACTION_SERVICE.id(),
pus_action_rx,
tm_funnel_tx,
create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid),
create_verification_reporter(PUS_ACTION_SERVICE.id(), PUS_ACTION_SERVICE.apid, 2048),
EcssTcInVecConverter::default(),
),
ActionRequestConverter::default(),
@ -236,48 +248,29 @@ pub struct ActionServiceWrapper {
}
impl TargetedPusService for ActionServiceWrapper {
/// Returns [true] if the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 8 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 8 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 8 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
return HandlingStatus::Empty;
}
const SERVICE_ID: u8 = PusServiceId::Action as u8;
const SERVICE_STR: &'static str = "action";
delegate::delegate! {
to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
HandlingStatus::HandledOne
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 8: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
pub fn send_data_reply<TmSender: EcssTmSender>(
apid_target: UniqueApidTargetId,
reply_data: Vec<u8>,
reply_data: &Vec<u8>,
stamp_helper: &TimeStampHelper,
tm_sender: &TmSender,
) -> Result<(), EcssTmtcError> {
@ -287,8 +280,8 @@ pub fn send_data_reply<TmSender: EcssTmSender>(
data.extend(apid_target.apid.to_be_bytes());
data.extend(apid_target.unique_id.to_be_bytes());
data.extend(reply_data);
debug!(
"{}",
log::trace!(
"PUS action reply: {}",
String::from_utf8(data.clone()[6..].to_vec()).expect("Error decoding data reply.")
);
let data_reply_tm = PusTmCreator::new(sp_header, sec_header, &data, true);
@ -300,8 +293,8 @@ mod tests {
use satrs::pus::test_util::{
TEST_APID, TEST_COMPONENT_ID_0, TEST_COMPONENT_ID_1, TEST_UNIQUE_ID_0, TEST_UNIQUE_ID_1,
};
use satrs::pus::verification;
use satrs::pus::verification::test_util::TestVerificationReporter;
use satrs::pus::{verification, TcInMemory};
use satrs::request::MessageMetadata;
use satrs::ComponentId;
use satrs::{
@ -406,7 +399,7 @@ mod tests {
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::RequestHandled => (),
HandlingStatus::HandledOne => (),
_ => panic!("unexpected result {result:?}"),
}
}
@ -418,19 +411,19 @@ mod tests {
}
let result = result.unwrap();
match result {
PusPacketHandlerResult::Empty => (),
HandlingStatus::Empty => (),
_ => panic!("unexpected result {result:?}"),
}
}
pub fn verify_next_reply_is_handled_properly(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp);
let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::HandledOne);
}
pub fn verify_all_replies_handled(&mut self, time_stamp: &[u8]) {
let result = self.service.poll_and_check_next_reply(time_stamp);
let result = self.service.poll_and_handle_next_reply(time_stamp);
assert!(result.is_ok());
assert_eq!(result.unwrap(), HandlingStatus::Empty);
}
@ -455,11 +448,7 @@ mod tests {
.check_next_is_acceptance_success(id, accepted_token.request_id());
self.pus_packet_tx
.send(EcssTcAndToken::new(
TcInMemory::Vec(PacketAsVec::new(
self.service.service_helper.id(),
//tc.to_vec().unwrap().into(),
tc.to_vec().unwrap(),
)),
PacketAsVec::new(self.service.service_helper.id(), tc.to_vec().unwrap()),
accepted_token,
))
.unwrap();

View File

@ -1,15 +1,16 @@
use std::sync::mpsc;
use super::HandlingStatus;
use super::{DirectPusService, HandlingStatus};
use crate::pus::create_verification_reporter;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver,
PartialPusHandlingError, PusServiceHelper,
};
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec;
pub fn create_event_service(
@ -22,7 +23,7 @@ pub fn create_event_service(
PUS_EVENT_MANAGEMENT.id(),
pus_event_rx,
tm_funnel_tx,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid, 16),
EcssTcInVecConverter::default(),
),
event_request_tx,
@ -41,24 +42,50 @@ pub struct EventServiceWrapper {
>,
}
impl EventServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.handler.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 5 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 5 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 5 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
impl DirectPusService for EventServiceWrapper {
const SERVICE_ID: u8 = PusServiceId::Event as u8;
const SERVICE_STR: &'static str = "events";
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let result = self
.handler
.poll_and_handle_next_tc(error_handler, time_stamp);
if let Err(e) = result {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops.
return HandlingStatus::Empty;
}
match result.unwrap() {
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
}
HandlingStatus::HandledOne

View File

@ -1,5 +1,4 @@
use derive_new::new;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_HK_SERVICE;
use ops_sat_rs::config::{hk_err, tmtc_err};
use satrs::hk::{CollectionIntervalFactor, HkRequest, HkRequestVariant, UniqueId};
@ -10,7 +9,7 @@ use satrs::pus::verification::{
use satrs::pus::{
ActivePusRequestStd, ActiveRequestProvider, DefaultActiveRequestMap, EcssTcAndToken,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError,
PusPacketHandlerResult, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
PusPacketHandlingError, PusReplyHandler, PusServiceHelper, PusTcToRequestConverter,
};
use satrs::request::{GenericMessage, UniqueApidTargetId};
use satrs::spacepackets::ecss::tc::PusTcReader;
@ -22,7 +21,7 @@ use std::time::Duration;
use crate::pus::{create_verification_reporter, generic_pus_request_timeout_handler};
use crate::requests::GenericRequestRouter;
use super::{HandlingStatus, PusTargetedRequestService};
use super::{HandlingStatus, PusTargetedRequestService, TargetedPusService};
#[derive(Clone, PartialEq, Debug, new)]
pub struct HkReply {
@ -241,7 +240,7 @@ pub fn create_hk_service(
PUS_HK_SERVICE.id(),
pus_hk_rx,
tm_funnel_tx,
create_verification_reporter(PUS_HK_SERVICE.id(), PUS_HK_SERVICE.apid),
create_verification_reporter(PUS_HK_SERVICE.id(), PUS_HK_SERVICE.apid, 16),
EcssTcInVecConverter::default(),
),
HkRequestConverter::default(),
@ -267,43 +266,25 @@ pub struct HkServiceWrapper {
>,
}
impl HkServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 3 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 3 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 3 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}");
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
impl TargetedPusService for HkServiceWrapper {
const SERVICE_ID: u8 = 3;
const SERVICE_STR: &'static str = "housekeeping";
delegate::delegate! {
to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
HandlingStatus::HandledOne
}
pub fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS 3: Handling reply failed with error {e:?}");
HandlingStatus::Empty
})
}
pub fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}

View File

@ -18,8 +18,8 @@ use satrs::pus::verification::{
use satrs::pus::{
ActiveRequestMapProvider, ActiveRequestProvider, EcssTcAndToken, EcssTcInMemConverter,
EcssTcInVecConverter, EcssTmSender, EcssTmtcError, GenericConversionError, GenericRoutingError,
MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlerResult, PusPacketHandlingError,
PusReplyHandler, PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
HandlingStatus, MpscTcReceiver, MpscTmAsVecSender, PusPacketHandlingError, PusReplyHandler,
PusRequestRouter, PusServiceHelper, PusTcToRequestConverter, TcInMemory,
};
use satrs::queue::{GenericReceiveError, GenericSendError};
use satrs::request::{Apid, GenericMessage, MessageMetadata};
@ -30,15 +30,12 @@ use satrs::ComponentId;
use std::fmt::Debug;
use std::sync::mpsc::{self, Sender};
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[allow(dead_code)]
pub enum HandlingStatus {
Empty,
HandledOne,
}
pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> VerificationReporter {
let verif_cfg = VerificationReporterCfg::new(apid, 1, 2, 8).unwrap();
pub fn create_verification_reporter(
owner_id: ComponentId,
apid: Apid,
max_fail_data_len: usize,
) -> VerificationReporter {
let verif_cfg = VerificationReporterCfg::new(apid, 1, 2, max_fail_data_len).unwrap();
// Every software component which needs to generate verification telemetry, gets a cloned
// verification reporter.
VerificationReporter::new(owner_id, &verif_cfg)
@ -70,6 +67,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
verif_reporter: create_verification_reporter(
PUS_ROUTING_SERVICE.id(),
PUS_ROUTING_SERVICE.apid,
16,
),
pus_router,
stamp_helper: TimeStampHelper::default(),
@ -80,7 +78,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
&mut self,
sender_id: ComponentId,
tc: Vec<u8>,
) -> Result<PusPacketHandlerResult, GenericSendError> {
) -> Result<HandlingStatus, GenericSendError> {
let pus_tc_result = PusTcReader::new(&tc);
if pus_tc_result.is_err() {
log::warn!(
@ -89,7 +87,7 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
pus_tc_result.unwrap_err()
);
log::warn!("raw data: {:x?}", tc);
return Ok(PusPacketHandlerResult::RequestHandled);
return Ok(HandlingStatus::HandledOne);
}
let pus_tc = pus_tc_result.unwrap().0;
let init_token = self.verif_reporter.add_tc(&pus_tc);
@ -161,17 +159,65 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
}
}
}
Ok(PusPacketHandlerResult::RequestHandled)
Ok(HandlingStatus::HandledOne)
}
}
pub trait TargetedPusService {
/// Returns [true] interface the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus;
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus;
const SERVICE_ID: u8;
const SERVICE_STR: &'static str;
fn poll_and_handle_next_tc_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let result = self.poll_and_handle_next_tc(time_stamp);
if let Err(e) = result {
log::error!(
"PUS service {}({}) packet handling error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
result.unwrap()
}
fn poll_and_handle_next_reply_default_handler(&mut self, time_stamp: &[u8]) -> HandlingStatus {
// This only fails if all senders disconnected. Treat it like an empty queue.
self.poll_and_handle_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!(
"PUS service {}({}): handling reply failed with error {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
HandlingStatus::Empty
})
}
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
/// Generic trait for services which handle packets directly. Kept minimal right now because
/// of the difficulty to allow flexible user code for these services..
pub trait DirectPusService {
const SERVICE_ID: u8;
const SERVICE_STR: &'static str;
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus;
}
/// This is a generic handlers class for all PUS services where a PUS telecommand is converted
/// to a targeted request.
///
@ -264,10 +310,10 @@ where
pub fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<PusPacketHandlerResult, PusPacketHandlingError> {
) -> Result<HandlingStatus, PusPacketHandlingError> {
let possible_packet = self.service_helper.retrieve_and_accept_next_packet()?;
if possible_packet.is_none() {
return Ok(PusPacketHandlerResult::Empty);
return Ok(HandlingStatus::Empty);
}
let ecss_tc_and_token = possible_packet.unwrap();
self.service_helper
@ -323,7 +369,7 @@ where
return Err(e.into());
}
}
Ok(PusPacketHandlerResult::RequestHandled)
Ok(HandlingStatus::HandledOne)
}
fn handle_conversion_to_request_error(
@ -376,7 +422,7 @@ where
}
}
pub fn poll_and_check_next_reply(
pub fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError> {
@ -406,20 +452,17 @@ where
return Ok(());
}
let active_request = active_req_opt.unwrap();
let request_finished = self
.reply_handler
.handle_reply(
reply,
active_request,
&self.service_helper.common.tm_sender,
&self.service_helper.common.verif_reporter,
time_stamp,
)
.unwrap_or(false);
if request_finished {
let result = self.reply_handler.handle_reply(
reply,
active_request,
&self.service_helper.common.tm_sender,
&self.service_helper.common.verif_reporter,
time_stamp,
);
if result.is_err() || (result.is_ok() && *result.as_ref().unwrap()) {
self.active_request_map.remove(reply.request_id());
}
Ok(())
result.map(|_| ())
}
pub fn check_for_request_timeouts(&mut self) {

View File

@ -1,15 +1,14 @@
use derive_new::new;
use log::{error, warn};
use satrs::tmtc::PacketAsVec;
use std::sync::mpsc;
use std::time::Duration;
use crate::requests::GenericRequestRouter;
use ops_sat_rs::config::components::PUS_MODE_SERVICE;
use ops_sat_rs::config::{mode_err, tmtc_err};
use ops_sat_rs::config::{mode_err, tmtc_err, CustomPusServiceId};
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlerResult,
DefaultActiveRequestMap, EcssTcAndToken, EcssTcInVecConverter, PusPacketHandlingError,
PusServiceHelper,
};
use satrs::request::GenericMessage;
@ -212,7 +211,7 @@ pub fn create_mode_service(
PUS_MODE_SERVICE.id(),
pus_action_rx,
tm_funnel_tx,
create_verification_reporter(PUS_MODE_SERVICE.id(), PUS_MODE_SERVICE.apid),
create_verification_reporter(PUS_MODE_SERVICE.id(), PUS_MODE_SERVICE.apid, 16),
EcssTcInVecConverter::default(),
),
ModeRequestConverter::default(),
@ -239,44 +238,27 @@ pub struct ModeServiceWrapper {
}
impl TargetedPusService for ModeServiceWrapper {
/// Returns [true] if the packet handling is finished.
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.service.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS mode service: partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS mode service: invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS mode service: {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS mode service: packet handling error: {error:?}");
// To avoid permanent loops on error cases.
return HandlingStatus::Empty;
}
const SERVICE_ID: u8 = CustomPusServiceId::Mode as u8;
const SERVICE_STR: &'static str = "mode";
delegate::delegate! {
to self.service {
fn poll_and_handle_next_tc(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, PusPacketHandlingError>;
fn poll_and_handle_next_reply(
&mut self,
time_stamp: &[u8],
) -> Result<HandlingStatus, EcssTmtcError>;
fn check_for_request_timeouts(&mut self);
}
HandlingStatus::HandledOne
}
fn poll_and_handle_next_reply(&mut self, time_stamp: &[u8]) -> HandlingStatus {
self.service
.poll_and_check_next_reply(time_stamp)
.unwrap_or_else(|e| {
warn!("PUS action service: Handling reply failed with error {e:?}");
HandlingStatus::HandledOne
})
}
fn check_for_request_timeouts(&mut self) {
self.service.check_for_request_timeouts();
}
}
#[cfg(test)]
mod tests {
use ops_sat_rs::config::tmtc_err;

View File

@ -2,65 +2,20 @@ use std::sync::mpsc;
use std::time::Duration;
use crate::pus::create_verification_reporter;
use log::{error, info, warn};
use log::info;
use ops_sat_rs::config::components::PUS_SCHEDULER_SERVICE;
use satrs::pool::{PoolProvider, StaticMemoryPool};
use satrs::pool::StaticMemoryPool;
use satrs::pus::scheduler::{PusScheduler, TcInfo};
use satrs::pus::scheduler_srv::PusSchedServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
MpscTcReceiver, PartialPusHandlingError, PusServiceHelper,
};
use satrs::tmtc::{PacketAsVec, PacketInPool, PacketSenderWithSharedPool};
use satrs::ComponentId;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec;
use super::HandlingStatus;
pub trait TcReleaser {
fn release(&mut self, sender_id: ComponentId, enabled: bool, info: &TcInfo, tc: &[u8]) -> bool;
}
impl TcReleaser for PacketSenderWithSharedPool {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
let shared_pool = self.shared_pool.get_mut();
// Transfer TC from scheduler TC pool to shared TC pool.
let released_tc_addr = shared_pool
.0
.write()
.expect("locking pool failed")
.add(tc)
.expect("adding TC to shared pool failed");
self.sender
.send(PacketInPool::new(sender_id, released_tc_addr))
.expect("sending TC to TC source failed");
}
true
}
}
impl TcReleaser for mpsc::Sender<PacketAsVec> {
fn release(
&mut self,
sender_id: ComponentId,
enabled: bool,
_info: &TcInfo,
tc: &[u8],
) -> bool {
if enabled {
// Send released TC to centralized TC source.
self.send(PacketAsVec::new(sender_id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
}
}
use super::DirectPusService;
pub struct SchedulingService {
pub pus_11_handler: PusSchedServiceHandler<
@ -72,14 +27,73 @@ pub struct SchedulingService {
>,
pub sched_tc_pool: StaticMemoryPool,
pub releaser_buf: [u8; 4096],
pub tc_releaser: Box<dyn TcReleaser + Send>,
pub tc_releaser: mpsc::Sender<PacketAsVec>,
}
impl DirectPusService for SchedulingService {
const SERVICE_ID: u8 = PusServiceId::Verification as u8;
const SERVICE_STR: &'static str = "verification";
fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let result = self.pus_11_handler.poll_and_handle_next_tc(
error_handler,
time_stamp,
&mut self.sched_tc_pool,
);
if let Err(e) = result {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops.
return HandlingStatus::Empty;
}
match result.unwrap() {
DirectPusPacketHandlerResult::Handled(handling_status) => return handling_status,
DirectPusPacketHandlerResult::CustomSubservice(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
}
HandlingStatus::HandledOne
}
}
impl SchedulingService {
pub fn release_tcs(&mut self) {
let id = self.pus_11_handler.service_helper.id();
let releaser = |enabled: bool, info: &TcInfo, tc: &[u8]| -> bool {
self.tc_releaser.release(id, enabled, info, tc)
let releaser = |enabled: bool, _info: &TcInfo, tc: &[u8]| -> bool {
if enabled {
// Send released TC to centralized TC source.
self.tc_releaser
.send(PacketAsVec::new(id, tc.to_vec()))
.expect("sending TC to TC source failed");
}
true
};
self.pus_11_handler
@ -99,31 +113,6 @@ impl SchedulingService {
info!("{released_tcs} TC(s) released from scheduler");
}
}
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self
.pus_11_handler
.poll_and_handle_next_tc(time_stamp, &mut self.sched_tc_pool)
{
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS11 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS11 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS11: Subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
HandlingStatus::HandledOne
}
}
pub fn create_scheduler_service(
@ -139,7 +128,11 @@ pub fn create_scheduler_service(
PUS_SCHEDULER_SERVICE.id(),
pus_sched_rx,
tm_funnel_tx,
create_verification_reporter(PUS_SCHEDULER_SERVICE.id(), PUS_SCHEDULER_SERVICE.apid),
create_verification_reporter(
PUS_SCHEDULER_SERVICE.id(),
PUS_SCHEDULER_SERVICE.apid,
16,
),
EcssTcInVecConverter::default(),
),
scheduler,
@ -148,6 +141,6 @@ pub fn create_scheduler_service(
pus_11_handler,
sched_tc_pool,
releaser_buf: [0; 4096],
tc_releaser: Box::new(tc_source_sender),
tc_releaser: tc_source_sender,
}
}

View File

@ -5,7 +5,7 @@ use satrs::spacepackets::time::{cds, TimeWriter};
use super::{
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService,
mode::ModeServiceWrapper, scheduler::SchedulingService, DirectPusService, TargetedPusService,
};
#[derive(new)]
@ -23,52 +23,28 @@ impl PusStack {
// Release all telecommands which reached their release time before calling the service
// handlers.
self.schedule_srv.release_tcs();
let time_stamp = cds::CdsTime::now_with_u16_days()
let timestamp = cds::CdsTime::now_with_u16_days()
.expect("time stamp generation error")
.to_vec()
.unwrap();
let mut loop_count = 0_u32;
// Hot loop which will run continuously until all request and reply handling is done.
loop {
let mut nothing_to_do = true;
let mut is_srv_finished =
|_srv_id: u8,
tc_handling_status: HandlingStatus,
reply_handling_status: Option<HandlingStatus>| {
if tc_handling_status == HandlingStatus::HandledOne
|| (reply_handling_status.is_some()
&& reply_handling_status.unwrap() == HandlingStatus::HandledOne)
{
nothing_to_do = false;
}
};
is_srv_finished(
17,
self.test_srv.poll_and_handle_next_packet(&time_stamp),
None,
Self::direct_service_checker(&mut self.test_srv, &timestamp, &mut nothing_to_do);
Self::direct_service_checker(&mut self.schedule_srv, &timestamp, &mut nothing_to_do);
Self::direct_service_checker(&mut self.event_srv, &timestamp, &mut nothing_to_do);
Self::targeted_service_checker(
&mut self.action_srv_wrapper,
&timestamp,
&mut nothing_to_do,
);
is_srv_finished(
11,
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
None,
);
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
is_srv_finished(
8,
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
Some(
self.action_srv_wrapper
.poll_and_handle_next_reply(&time_stamp),
),
);
is_srv_finished(
3,
self.hk_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
Some(self.hk_srv_wrapper.poll_and_handle_next_reply(&time_stamp)),
);
is_srv_finished(
200,
self.mode_srv.poll_and_handle_next_tc(&time_stamp),
Some(self.mode_srv.poll_and_handle_next_reply(&time_stamp)),
Self::targeted_service_checker(
&mut self.hk_srv_wrapper,
&timestamp,
&mut nothing_to_do,
);
Self::targeted_service_checker(&mut self.mode_srv, &timestamp, &mut nothing_to_do);
if nothing_to_do {
// Timeout checking is only done once.
self.action_srv_wrapper.check_for_request_timeouts();
@ -76,6 +52,37 @@ impl PusStack {
self.mode_srv.check_for_request_timeouts();
break;
}
// Safety mechanism to avoid infinite loops.
loop_count += 1;
if loop_count >= 500 {
log::warn!("reached PUS stack loop count 500, breaking");
break;
}
}
}
pub fn direct_service_checker<S: DirectPusService>(
service: &mut S,
timestamp: &[u8],
nothing_to_do: &mut bool,
) {
let handling_status = service.poll_and_handle_next_tc(timestamp);
if handling_status == HandlingStatus::HandledOne {
*nothing_to_do = false;
}
}
pub fn targeted_service_checker<S: TargetedPusService>(
service: &mut S,
timestamp: &[u8],
nothing_to_do: &mut bool,
) {
let request_handling = service.poll_and_handle_next_tc_default_handler(timestamp);
let reply_handling = service.poll_and_handle_next_reply_default_handler(timestamp);
if request_handling == HandlingStatus::HandledOne
|| reply_handling == HandlingStatus::HandledOne
{
*nothing_to_do = false;
}
}
}

View File

@ -1,22 +1,20 @@
use crate::pus::create_verification_reporter;
use log::{info, warn};
use log::info;
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::{
EcssTcAndToken, EcssTcInMemConverter, EcssTcInVecConverter, MpscTcReceiver, MpscTmAsVecSender,
PusPacketHandlerResult, PusServiceHelper,
DirectPusPacketHandlerResult, EcssTcAndToken, EcssTcInVecConverter, HandlingStatus,
MpscTcReceiver, MpscTmAsVecSender, PartialPusHandlingError, PusServiceHelper,
};
use satrs::spacepackets::ecss::tc::PusTcReader;
use satrs::spacepackets::ecss::PusPacket;
use satrs::spacepackets::time::cds::CdsTime;
use satrs::spacepackets::time::TimeWriter;
use satrs::queue::GenericSendError;
use satrs::spacepackets::ecss::PusServiceId;
use satrs::tmtc::PacketAsVec;
use std::sync::mpsc;
use super::HandlingStatus;
use super::DirectPusService;
pub fn create_test_service(
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
@ -27,7 +25,7 @@ pub fn create_test_service(
PUS_TEST_SERVICE.id(),
pus_test_rx,
tm_funnel_tx,
create_verification_reporter(PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.apid),
create_verification_reporter(PUS_TEST_SERVICE.id(), PUS_TEST_SERVICE.apid, 16),
EcssTcInVecConverter::default(),
));
TestCustomServiceWrapper {
@ -46,61 +44,85 @@ pub struct TestCustomServiceWrapper {
pub event_tx: mpsc::SyncSender<EventMessageU32>,
}
impl TestCustomServiceWrapper {
pub fn poll_and_handle_next_packet(&mut self, time_stamp: &[u8]) -> HandlingStatus {
let res = self.handler.poll_and_handle_next_tc(time_stamp);
if res.is_err() {
warn!("PUS17 handlers failed with error {:?}", res.unwrap_err());
impl DirectPusService for TestCustomServiceWrapper {
const SERVICE_ID: u8 = PusServiceId::Test as u8;
const SERVICE_STR: &'static str = "test";
fn poll_and_handle_next_tc(&mut self, timestamp: &[u8]) -> HandlingStatus {
let error_handler = |partial_error: &PartialPusHandlingError| {
log::warn!(
"PUS {}({}) partial error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
partial_error
);
};
let res = self
.handler
.poll_and_handle_next_tc(error_handler, timestamp);
if let Err(e) = res {
log::warn!(
"PUS {}({}) error: {:?}",
Self::SERVICE_ID,
Self::SERVICE_STR,
e
);
// To avoid permanent loops.
return HandlingStatus::Empty;
}
match res.unwrap() {
PusPacketHandlerResult::RequestHandled => {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
DirectPusPacketHandlerResult::Handled(handling_status) => {
if handling_status == HandlingStatus::HandledOne {
info!("Received PUS ping command TC[17,1]");
info!("Sent ping reply PUS TM[17,2]");
}
return handling_status;
}
PusPacketHandlerResult::RequestHandledPartialSuccess(partial_err) => {
warn!(
"Handled PUS ping command with partial success: {:?}",
partial_err
DirectPusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
log::warn!(
"PUS {}({}) subservice {} not implemented",
Self::SERVICE_ID,
Self::SERVICE_STR,
subservice
);
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS17: Subservice {subservice} not implemented")
}
// TODO: adapt interface events are implemented
PusPacketHandlerResult::CustomSubservice(subservice, token) => {
let (tc, _) = PusTcReader::new(
self.handler
.service_helper
.tc_in_mem_converter
.tc_slice_raw(),
)
.unwrap();
let time_stamper = CdsTime::now_with_u16_days().unwrap();
let mut stamp_buf: [u8; 7] = [0; 7];
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
DirectPusPacketHandlerResult::CustomSubservice(subservice, token) => {
if subservice == 128 {
info!("Generating test event");
self.event_tx
info!("generating test event");
if let Err(e) = self
.event_tx
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
.expect("Sending test event failed");
let start_token = self
.handler
.service_helper
.verif_reporter()
.start_success(self.handler.service_helper.tm_sender(), token, &stamp_buf)
.expect("Error sending start success");
self.handler
.service_helper
.verif_reporter()
.completion_success(
self.handler.service_helper.tm_sender(),
start_token,
&stamp_buf,
)
.expect("Error sending completion success");
.map_err(|_| GenericSendError::RxDisconnected)
{
// This really should not happen but I want to avoid panicking..
log::warn!("failed to send test event: {:?}", e);
}
match self.handler.service_helper.verif_reporter().start_success(
self.handler.service_helper.tm_sender(),
token,
timestamp,
) {
Ok(started_token) => {
if let Err(e) = self
.handler
.service_helper
.verif_reporter()
.completion_success(
self.handler.service_helper.tm_sender(),
started_token,
timestamp,
)
{
error_handler(&PartialPusHandlingError::Verification(e));
}
}
Err(e) => {
error_handler(&PartialPusHandlingError::Verification(e));
}
}
} else {
let fail_data = [tc.subservice()];
let fail_data = [subservice];
self.handler
.service_helper
.verif_reporter()
@ -108,7 +130,7 @@ impl TestCustomServiceWrapper {
self.handler.service_helper.tm_sender(),
token,
FailParams::new(
&stamp_buf,
timestamp,
&tmtc_err::INVALID_PUS_SUBSERVICE,
&fail_data,
),
@ -116,7 +138,6 @@ impl TestCustomServiceWrapper {
.expect("Sending start failure verification failed");
}
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
}
HandlingStatus::HandledOne
}

View File

@ -1,8 +1,11 @@
use std::sync::mpsc::{self, TryRecvError};
use satrs::{pus::MpscTmAsVecSender, tmtc::PacketAsVec};
use satrs::{
pus::{HandlingStatus, MpscTmAsVecSender},
tmtc::PacketAsVec,
};
use crate::pus::{HandlingStatus, PusTcDistributor};
use crate::pus::PusTcDistributor;
// TC source components where the heap is the backing memory of the received telecommands.
pub struct TcSourceTaskDynamic {