clean up python code a bit #18

Merged
muellerr merged 3 commits from some-python-client-cleanup into main 2024-04-24 20:41:03 +02:00
16 changed files with 261 additions and 87 deletions

View File

@ -55,7 +55,20 @@ Commanding of the `ops-sat-rs` application is possible by different means.
### Using the `pyclient` and `pyserver` applications
You can find both commanding application inside the `pytmtc` folder.
You can also find a `requirements.txt` file there to install all required Python dependencies.
It is recommended to set up a virtual environment first, for example by running the following
code inside the `pytmtc` folder:
```sh
python3 -m venv venv
source venv/bin/activate
```
After that, you can install all requirements for both the client and server application
interactively using
```sh
pip install -e .
```
If you want to command the satellite using the OPS-SAT infrastrucute, start the `pyserver.py`
as a background application first, for example by simply running `pyserver.py` inside a

133
pytmtc/.gitignore vendored
View File

@ -8,3 +8,136 @@ __pycache__
/seqcnt.txt
/.tmtc-history.txt
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# PyCharm
.idea

View File

@ -1,31 +0,0 @@
import struct
from serde import Model, fields
from common import EXPERIMENT_APID, UniqueId, make_addressable_id
class CameraParameters(Model):
R: fields.Int()
G: fields.Int()
B: fields.Int()
N: fields.Int()
P: fields.Bool()
E: fields.Int()
W: fields.Int()
def serialize_for_uplink(self) -> bytearray:
return self.to_json().encode('utf-8')
# Example serialization
data = bytearray(make_addressable_id(EXPERIMENT_APID, UniqueId.CameraHandler))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
serialized = params.to_json().encode('utf-8')
byte_string = bytearray(struct.pack('!{}s'.format(len(serialized)), serialized))
print(byte_string)
print(params.serialize_for_uplink())
data.extend(params.serialize_for_uplink())
print(data)
# Example deserialization
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
deserialized_params = CameraParameters.from_json(data)
print(deserialized_params)

View File

@ -0,0 +1,17 @@
import struct
from serde import Model, fields
from opssat_tmtc.common import EXPERIMENT_APID, UniqueId, make_unique_id
class CameraParameters(Model):
R: fields.Int()
G: fields.Int()
B: fields.Int()
N: fields.Int()
P: fields.Bool()
E: fields.Int()
W: fields.Int()
def serialize_for_uplink(self) -> bytearray:
return self.to_json().encode("utf-8")

View File

@ -3,14 +3,12 @@ from __future__ import annotations
import dataclasses
import enum
import struct
from serde import Model, fields
EXPERIMENT_ID = 278
EXPERIMENT_APID = 1024 + EXPERIMENT_ID
class UniqueId(enum.IntEnum):
Controller = 0
PusEventManagement = 1
PusRouting = 2
@ -59,14 +57,11 @@ class AcsHkIds(enum.IntEnum):
MGM_SET = 1
def make_addressable_id(target_id: int, unique_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", unique_id))
# byte_string = bytearray(struct.pack("!I", target_id))
# byte_string.extend(struct.pack("!I", unique_id))
return byte_string
def make_unique_id(unique_id: int) -> bytes:
return struct.pack("!I", unique_id)
def make_addressable_id_with_action_id(unique_id: int, action_id: int) -> bytes:
def make_action_cmd_header(unique_id: int, action_id: int) -> bytes:
byte_string = bytearray(struct.pack("!I", unique_id))
byte_string.extend(struct.pack("!I", action_id))
return byte_string

View File

@ -9,10 +9,13 @@ from tmtccmd.pus.tc.s200_fsfw_mode import Mode
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
from serde import Model, fields
from camera_params import CameraParameters
from common import EXPERIMENT_APID, UniqueId, make_addressable_id, make_addressable_id_with_action_id
from opssat_tmtc.camera_params import CameraParameters
from opssat_tmtc.common import (
EXPERIMENT_APID,
UniqueId,
make_action_cmd_header,
)
_LOGGER = logging.getLogger(__name__)
@ -70,33 +73,27 @@ def create_cmd_definition_tree() -> CmdTreeNode:
root_node.add_child(scheduler_node)
action_node = CmdTreeNode("action", "Action Node")
cam_node = CmdTreeNode(
"take_image", "Take Image with IMS Imager"
)
cam_node = CmdTreeNode("take_image", "Take Image with IMS Imager")
cam_node.add_child(
CmdTreeNode("default_single", "Default Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode("balanced_single", "Balanced Single Image Camera Parameters")
)
cam_node.add_child(
CmdTreeNode(
"default_single", "Default Single Image Camera Parameters"
"default_single_flatsat",
"Default Single Image Camera Parameters for use on FlatSat",
)
)
cam_node.add_child(
CmdTreeNode(
"balanced_single", "Balanced Single Image Camera Parameters"
"balanced_single_flatsat",
"Balanced Single Image Camera Parameters for use on FlatSat",
)
)
cam_node.add_child(
CmdTreeNode(
"default_single_flatsat", "Default Single Image Camera Parameters for use on FlatSat"
)
)
cam_node.add_child(
CmdTreeNode(
"balanced_single_flatsat", "Balanced Single Image Camera Parameters for use on FlatSat"
)
)
cam_node.add_child(
CmdTreeNode(
"custom_params", "Custom Camera Parameters as specified from file"
)
CmdTreeNode("custom_params", "Custom Camera Parameters as specified from file")
)
action_node.add_child(cam_node)
root_node.add_child(action_node)
@ -134,25 +131,32 @@ def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
if cmd_path_list[0] == "acs":
assert len(cmd_path_list) >= 2
if cmd_path_list[0] == "action":
assert len(cmd_path_list)>= 2
assert len(cmd_path_list) >= 2
if cmd_path_list[1] == "take_image":
assert len(cmd_path_list)>= 3
q.add_log_cmd("Sending PUS take image action request with " + cmd_path_list[2] + " params.")
assert len(cmd_path_list) >= 3
q.add_log_cmd(
"Sending PUS take image action request with "
+ cmd_path_list[2]
+ " params."
)
data = bytearray()
if cmd_path_list[2] == "default_single":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 1)
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 1))
if cmd_path_list[2] == "balanced_single":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 2)
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 2))
if cmd_path_list[2] == "default_single_flatsat":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 3)
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 3))
if cmd_path_list[2] == "balanced_single_flatsat":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 4)
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 4))
if cmd_path_list[2] == "custom":
data = make_addressable_id_with_action_id(UniqueId.CameraHandler, 5)
data.extend(make_action_cmd_header(UniqueId.CameraHandler, 5))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
bytes = params.serialize_for_uplink()
data.extend(bytes)
print(data.hex(sep=","))
return q.add_pus_tc(PusTelecommand(service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data))
data.extend(params.serialize_for_uplink())
return q.add_pus_tc(
PusTelecommand(
service=8, subservice=128, apid=EXPERIMENT_APID, app_data=data
)
)
def handle_set_mode_cmd(

View File

View File

@ -1,7 +1,7 @@
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper, CoreServiceList
from tmtccmd.config.globals import get_default_tmtc_defs
from common import HkOpCodes
from opssat_tmtc.common import HkOpCodes
def tc_definitions() -> TmtcDefinitionWrapper:
@ -41,6 +41,5 @@ def tc_definitions() -> TmtcDefinitionWrapper:
name=CoreServiceList.SERVICE_8,
info="PUS Service 8 Action",
op_code_entry=srv_8,
)
return defs

View File

@ -46,9 +46,8 @@ from tmtccmd.tmtc import (
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
import pus_tc
from common import EXPERIMENT_APID, EventU32
from opssat_tmtc.pus_tc import create_cmd_definition_tree, pack_pus_telecommands
from opssat_tmtc.common import EXPERIMENT_APID, EventU32
_LOGGER = logging.getLogger()
@ -76,7 +75,7 @@ class SatRsConfigHook(HookBase):
def get_command_definitions(self) -> CmdTreeNode:
"""This function should return the root node of the command definition tree."""
return pus_tc.create_cmd_definition_tree()
return create_cmd_definition_tree()
def get_cmd_history(self) -> Optional[History]:
"""Optionlly return a history class for the past command paths which will be used
@ -149,10 +148,10 @@ class PusHandler(GenericApidHandlerBase):
_LOGGER.info("Received test event")
elif service == 8:
if pus_tm.subservice == 130:
_LOGGER.info(f"Received Action Data Reply TM[8,130]")
_LOGGER.info("Received Action Data Reply TM[8,130]")
reply = pus_tm.source_data
reply = reply[6:]
_LOGGER.info(f"Data Reply Content: " + reply.decode('utf-8'))
_LOGGER.info(f"Data Reply Content: {reply.decode()}")
elif service == 17:
tm_packet = Service17Tm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
@ -217,7 +216,7 @@ class TcHandler(TcHandlerBase):
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
assert def_proc.cmd_path is not None
pus_tc.pack_pus_telecommands(q, def_proc.cmd_path)
pack_pus_telecommands(q, def_proc.cmd_path)
def main():

26
pytmtc/pyproject.toml Normal file
View File

@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "opssat-tmtc"
description = "Python TMTC client for OPS-SAT"
readme = "README.md"
version = "0.1.0"
requires-python = ">=3.8"
authors = [
{name = "Robin Mueller", email = "robin.mueller.m@gmail.com"},
{name = "Linus Köster", email = "st167799@stud.uni-stuttgart.de"}
]
dependencies = [
"tmtccmd==8.0.0rc.2",
"serde==0.9.0"
]
[tool.setuptools.packages]
find = {}
[tool.ruff.lint]
ignore = ["E501"]
[tool.ruff.lint.extend-per-file-ignores]
"__init__.py" = ["F401"]

View File

@ -17,7 +17,7 @@ EXP_PACKET_ID_TM = PacketId(PacketType.TM, True, EXP_APID)
EXP_PACKET_ID_TC = PacketId(PacketType.TC, True, EXP_APID)
OPSSAT_SERVER_PORT = 4096
TMTC_SERVER_PORT = 4097
LOG_LEVEL = logging.DEBUG
LOG_LEVEL = logging.INFO
TC_QUEUE = Queue()

View File

@ -1,2 +1,2 @@
tmtccmd == 8.0.0rc2
.
# -e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@ -0,0 +1,20 @@
import struct
from opssat_tmtc.camera_params import CameraParameters
from opssat_tmtc.common import make_unique_id, EXPERIMENT_APID
def test_serde_serialization():
# Example serializatn
data = bytearray(make_unique_id(EXPERIMENT_APID))
params = CameraParameters(8, 8, 8, 1, True, 200, 1000)
serialized = params.to_json().encode("utf-8")
byte_string = bytearray(struct.pack("!{}s".format(len(serialized)), serialized))
print(byte_string)
print(params.serialize_for_uplink())
data.extend(params.serialize_for_uplink())
print(data)
# Example deserialization
data = '{"R": 100, "G": 150, "B": 200, "N": 3, "P": true, "E": 10, "W": 20}'
deserialized_params = CameraParameters.from_json(data)
print(deserialized_params)

View File

@ -22,8 +22,8 @@ pub const VALID_PACKET_ID_LIST: &[PacketId] = &[PacketId::new_for_tc(true, EXPER
// TODO: Would be nice if this can be commanded as well..
/// Can be enabled to print all SPP packets received from the SPP server on port 4096.
pub const SPP_CLIENT_WIRETAPPING_RX: bool = true;
pub const SPP_CLIENT_WIRETAPPING_TX: bool = true;
pub const SPP_CLIENT_WIRETAPPING_RX: bool = false;
pub const SPP_CLIENT_WIRETAPPING_TX: bool = false;
#[derive(Copy, Clone, PartialEq, Eq, Debug, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]

View File

@ -8,7 +8,6 @@ pub fn setup_logger() -> Result<(), fern::InitError> {
}
let mut path_buf = PathBuf::from(LOG_FOLDER);
path_buf.push("output.log");
println!("{:?}", path_buf);
fern::Dispatch::new()
.format(move |out, message, record| {
out.finish(format_args!(