Merge pull request 'clean up python code a bit' (#18) from some-python-client-cleanup into main

Reviewed-on: #18
This commit is contained in:
Robin Müller 2024-04-24 20:41:02 +02:00
commit 404d1c1c1e
16 changed files with 261 additions and 87 deletions

View File

@ -55,7 +55,20 @@ Commanding of the `ops-sat-rs` application is possible by different means.
### Using the `pyclient` and `pyserver` applications ### Using the `pyclient` and `pyserver` applications
You can find both commanding application inside the `pytmtc` folder. You can find both commanding application inside the `pytmtc` folder.
You can also find a `requirements.txt` file there to install all required Python dependencies. It is recommended to set up a virtual environment first, for example by running the following
code inside the `pytmtc` folder:
```sh
python3 -m venv venv
source venv/bin/activate
```
After that, you can install all requirements for both the client and server application
interactively using
```sh
pip install -e .
```
If you want to command the satellite using the OPS-SAT infrastrucute, start the `pyserver.py` If you want to command the satellite using the OPS-SAT infrastrucute, start the `pyserver.py`
as a background application first, for example by simply running `pyserver.py` inside a as a background application first, for example by simply running `pyserver.py` inside a

133
pytmtc/.gitignore vendored
View File

@ -8,3 +8,136 @@ __pycache__
/seqcnt.txt /seqcnt.txt
/.tmtc-history.txt /.tmtc-history.txt
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# PyCharm
.idea

View File

@ -1,31 +0,0 @@
import struct
from serde import Model, fields
from common import EXPERIMENT_APID, UniqueId, make_addressable_id
class CameraParameters(Model):
R: fields.Int()
G: fields.Int()
B: fields.Int()
N: fields.Int()
P: fields.Bool()
E: fields.Int()
W: fields.Int()
def serialize_for_uplink(self) -> bytearray:
return self.to_json().encode('utf-8')
# Example serialization
data = bytearray(make_addressable_id(EXPERIMENT_APID, UniqueId.CameraHandler))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
serialized = params.to_json().encode('utf-8')
byte_string = bytearray(struct.pack('!{}s'.format(len(serialized)), serialized))
print(byte_string)
print(params.serialize_for_uplink())
data.extend(params.serialize_for_uplink())
print(data)
# Example deserialization
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
deserialized_params = CameraParameters.from_json(data)
print(deserialized_params)

View File

@ -0,0 +1,17 @@
import struct
from serde import Model, fields
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_unique_id
class CameraParameters(Model):
R: fields.Int()
G: fields.Int()
B: fields.Int()
N: fields.Int()
P: fields.Bool()
E: fields.Int()
W: fields.Int()
def serialize_for_uplink(self) -> bytearray:
return self.to_json().encode("utf-8")

View File

@ -3,14 +3,12 @@ from __future__ import annotations
import dataclasses import dataclasses
import enum import enum
import struct import struct
from serde import Model, fields
EXPERIMENT_ID = 278 EXPERIMENT_ID = 278
EXPERIMENT_APID = 1024 + EXPERIMENT_ID EXPERIMENT_APID = 1024 + EXPERIMENT_ID
class UniqueId(enum.IntEnum): class UniqueId(enum.IntEnum):
Controller = 0 Controller = 0
PusEventManagement = 1 PusEventManagement = 1
PusRouting = 2 PusRouting = 2
@ -59,14 +57,11 @@ class AcsHkIds(enum.IntEnum):
MGM_SET = 1 MGM_SET = 1
def make_addressable_id(target_id: int, unique_id: int) -> bytes: def make_unique_id(unique_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", unique_id)) return struct.pack("!I", unique_id)
# byte_string = bytearray(struct.pack("!I", target_id))
# byte_string.extend(struct.pack("!I", unique_id))
return byte_string
def make_addressable_id_with_action_id(unique_id: int, action_id: int) -> bytes: def make_action_cmd_header(unique_id: int, action_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", unique_id)) byte_string = bytearray(struct.pack("!I", unique_id))
byte_string.extend(struct.pack("!I", action_id)) byte_string.extend(struct.pack("!I", action_id))
return byte_string return byte_string

View File

@ -9,10 +9,13 @@ from tmtccmd.pus.tc.s200_fsfw_mode import Mode
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
from serde import Model, fields
from camera_params import CameraParameters from opssat_tmtc.camera_params import CameraParameters
from common import EXPERIMENT_APID, UniqueId, make_addressable_id, make_addressable_id_with_action_id from opssat_tmtc.common import (
EXPERIMENT_APID,
UniqueId,
make_action_cmd_header,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -70,33 +73,27 @@ def create_cmd_definition_tree() -> CmdTreeNode:
root_node.add_child(scheduler_node) root_node.add_child(scheduler_node)
action_node = CmdTreeNode("action", "Action Node") action_node = CmdTreeNode("action", "Action Node")
cam_node = CmdTreeNode( cam_node = CmdTreeNode("take_image", "Take Image with IMS Imager")
"take_image", "Take Image with IMS Imager" cam_node.add_child(
) CmdTreeNode("default_single", "Default Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
)
cam_node.add_child( cam_node.add_child(
CmdTreeNode( CmdTreeNode(
"default_single", "Default Single Image Camera Parameters" "default_single_flatsat",
"Default Single Image Camera Parameters for use on FlatSat",
) )
) )
cam_node.add_child( cam_node.add_child(
CmdTreeNode( CmdTreeNode(
"balanced_single", "Balanced Single Image Camera Parameters" "balanced_single_flatsat",
"Balanced Single Image Camera Parameters for use on FlatSat",
) )
) )
cam_node.add_child( cam_node.add_child(
CmdTreeNode( CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
"default_single_flatsat", "Default Single Image Camera Parameters for use on FlatSat"
)
)
cam_node.add_child(
CmdTreeNode(
"balanced_single_flatsat", "Balanced Single Image Camera Parameters for use on FlatSat"
)
)
cam_node.add_child(
CmdTreeNode(
"custom_params", "Custom Camera Parameters as specified from file"
)
) )
action_node.add_child(cam_node) action_node.add_child(cam_node)
root_node.add_child(action_node) root_node.add_child(action_node)
@ -134,25 +131,32 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
if cmd_path_list[0] == "acs": if cmd_path_list[0] == "acs":
assert len(cmd_path_list) >= 2 assert len(cmd_path_list) >= 2
if cmd_path_list[0] == "action": if cmd_path_list[0] == "action":
assert len(cmd_path_list)>= 2 assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "take_image": if cmd_path_list[1] == "take_image":
assert len(cmd_path_list)>= 3 assert len(cmd_path_list) >= 3
q.add_log_cmd("Sending PUS take image action request with " + cmd_path_list[2] + " params.") q.add_log_cmd(
"Sending PUS take image action request with "
+ cmd_path_list[2]
+ " params."
)
data = bytearray()
if cmd_path_list[2] == "default_single": if cmd_path_list[2] == "default_single":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 1) data.extend(make_action_cmd_header(UniqueId.CameraHandler, 1))
if cmd_path_list[2] == "balanced_single": if cmd_path_list[2] == "balanced_single":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 2) data.extend(make_action_cmd_header(UniqueId.CameraHandler, 2))
if cmd_path_list[2] == "default_single_flatsat": if cmd_path_list[2] == "default_single_flatsat":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 3) data.extend(make_action_cmd_header(UniqueId.CameraHandler, 3))
if cmd_path_list[2] == "balanced_single_flatsat": if cmd_path_list[2] == "balanced_single_flatsat":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 4) data.extend(make_action_cmd_header(UniqueId.CameraHandler, 4))
if cmd_path_list[2] == "custom": if cmd_path_list[2] == "custom":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 5) data.extend(make_action_cmd_header(UniqueId.CameraHandler, 5))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000) params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
bytes = params.serialize_for_uplink() data.extend(params.serialize_for_uplink())
data.extend(bytes) return q.add_pus_tc(
print(data.hex(sep=",")) PusTelecommand(
return q.add_pus_tc(PusTelecommand(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data)) service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
)
)
def handle_set_mode_cmd( def handle_set_mode_cmd(

View File

View File

@ -1,7 +1,7 @@
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper, CoreServiceList from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper, CoreServiceList
from tmtccmd.config.globals import get_default_tmtc_defs from tmtccmd.config.globals import get_default_tmtc_defs
from common import HkOpCodes from opssat_tmtc.common import HkOpCodes
def tc_definitions() -> TmtcDefinitionWrapper: def tc_definitions() -> TmtcDefinitionWrapper:
@ -41,6 +41,5 @@ def tc_definitions() -> TmtcDefinitionWrapper:
name=CoreServiceList.SERVICE_8, name=CoreServiceList.SERVICE_8,
info="PUS Service 8 Action", info="PUS Service 8 Action",
op_code_entry=srv_8, op_code_entry=srv_8,
) )
return defs return defs

View File

@ -46,9 +46,8 @@ from tmtccmd.tmtc import (
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT from tmtccmd.util.obj_id import ObjectIdDictT
from opssat_tmtc.pus_tc import create_cmd_definition_tree, pack_pus_telecommands
import pus_tc from opssat_tmtc.common import EXPERIMENT_APID, EventU32
from common import EXPERIMENT_APID, EventU32
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
@ -76,7 +75,7 @@ class SatRsConfigHook(HookBase):
def get_command_definitions(self) -> CmdTreeNode: def get_command_definitions(self) -> CmdTreeNode:
"""This function should return the root node of the command definition tree.""" """This function should return the root node of the command definition tree."""
return pus_tc.create_cmd_definition_tree() return create_cmd_definition_tree()
def get_cmd_history(self) -> Optional[History]: def get_cmd_history(self) -> Optional[History]:
"""Optionlly return a history class for the past command paths which will be used """Optionlly return a history class for the past command paths which will be used
@ -149,10 +148,10 @@ class PusHandler(GenericApidHandlerBase):
_LOGGER.info("Received test event") _LOGGER.info("Received test event")
elif service == 8: elif service == 8:
if pus_tm.subservice == 130: if pus_tm.subservice == 130:
_LOGGER.info(f"Received Action Data Reply TM[8,130]") _LOGGER.info("Received Action Data Reply TM[8,130]")
reply = pus_tm.source_data reply = pus_tm.source_data
reply = reply[6:] reply = reply[6:]
_LOGGER.info(f"Data Reply Content: " + reply.decode('utf-8')) _LOGGER.info(f"Data Reply Content: {reply.decode()}")
elif service == 17: elif service == 17:
tm_packet = Service17Tm.unpack( tm_packet = Service17Tm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
@ -217,7 +216,7 @@ class TcHandler(TcHandlerBase):
if info.proc_type == TcProcedureType.TREE_COMMANDING: if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure() def_proc = info.to_tree_commanding_procedure()
assert def_proc.cmd_path is not None assert def_proc.cmd_path is not None
pus_tc.pack_pus_telecommands(q, def_proc.cmd_path) pack_pus_telecommands(q, def_proc.cmd_path)
def main(): def main():

26
pytmtc/pyproject.toml Normal file
View File

@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "opssat-tmtc"
description = "Python TMTC client for OPS-SAT"
readme = "README.md"
version = "0.1.0"
requires-python = ">=3.8"
authors = [
{name = "Robin Mueller", email = "robin.mueller.m@gmail.com"},
{name = "Linus Köster", email = "st167799@stud.uni-stuttgart.de"}
]
dependencies = [
"tmtccmd==8.0.0rc.2",
"serde==0.9.0"
]
[tool.setuptools.packages]
find = {}
[tool.ruff.lint]
ignore = ["E501"]
[tool.ruff.lint.extend-per-file-ignores]
"__init__.py" = ["F401"]

View File

@ -17,7 +17,7 @@ EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID)
EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID) EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID)
OPSSAT_SERVER_PORT = 4096 OPSSAT_SERVER_PORT = 4096
TMTC_SERVER_PORT = 4097 TMTC_SERVER_PORT = 4097
LOG_LEVEL = logging.DEBUG LOG_LEVEL = logging.INFO
TC_QUEUE = Queue() TC_QUEUE = Queue()

View File

@ -1,2 +1,2 @@
tmtccmd == 8.0.0rc2 .
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd # -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@ -0,0 +1,20 @@
import struct
from opssat_tmtc.camera_params import CameraParameters
from opssat_tmtc.common import make_unique_id, EXPERIMENT_APID
def test_serde_serialization():
# Example serializatn
data = bytearray(make_unique_id(EXPERIMENT_APID))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
serialized = params.to_json().encode("utf-8")
byte_string = bytearray(struct.pack("!{}s".format(len(serialized)), serialized))
print(byte_string)
print(params.serialize_for_uplink())
data.extend(params.serialize_for_uplink())
print(data)
# Example deserialization
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
deserialized_params = CameraParameters.from_json(data)
print(deserialized_params)

View File

@ -22,8 +22,8 @@ pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPER
// TODO: Would be nice if this can be commanded as well.. // TODO: Would be nice if this can be commanded as well..
/// Can be enabled to print all SPP packets received from the SPP server on port 4096. /// Can be enabled to print all SPP packets received from the SPP server on port 4096.
pub const SPP_CLIENT_WIRETAPPING_RX: bool = true; pub const SPP_CLIENT_WIRETAPPING_RX: bool = false;
pub const SPP_CLIENT_WIRETAPPING_TX: bool = true; pub const SPP_CLIENT_WIRETAPPING_TX: bool = false;
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)] #[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)] #[repr(u8)]

View File

@ -8,7 +8,6 @@ pub fn setup_logger() -> Result<(), fern::InitError> {
} }
let mut path_buf = PathBuf::from(LOG_FOLDER); let mut path_buf = PathBuf::from(LOG_FOLDER);
path_buf.push("output.log"); path_buf.push("output.log");
println!("{:?}", path_buf);
fern::Dispatch::new() fern::Dispatch::new()
.format(move |out, message, record| { .format(move |out, message, record| {
out.finish(format_args!( out.finish(format_args!(