fixes #1
+143
-143
@@ -1,143 +1,143 @@
|
||||
/tmtc_conf.json
|
||||
__pycache__
|
||||
|
||||
/venv
|
||||
/log
|
||||
/.idea/*
|
||||
!/.idea/runConfigurations
|
||||
|
||||
/seqcnt.txt
|
||||
/.tmtc-history.txt
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# PyCharm
|
||||
.idea
|
||||
/tmtc_conf.json
|
||||
__pycache__
|
||||
|
||||
/venv
|
||||
/log
|
||||
/.idea/*
|
||||
!/.idea/runConfigurations
|
||||
|
||||
/seqcnt.txt
|
||||
/.tmtc-history.txt
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# PyCharm
|
||||
.idea
|
||||
|
||||
+150
-150
@@ -1,151 +1,151 @@
|
||||
{
|
||||
"excluded_patterns": [
|
||||
".git",
|
||||
".gitignore",
|
||||
"gradle",
|
||||
"gradlew",
|
||||
"gradlew.*",
|
||||
"node_modules",
|
||||
".snapshots",
|
||||
".idea",
|
||||
".vscode",
|
||||
"*.log",
|
||||
"*.tmp",
|
||||
"target",
|
||||
"dist",
|
||||
"build",
|
||||
".DS_Store",
|
||||
"*.bak",
|
||||
"*.swp",
|
||||
"*.swo",
|
||||
"*.lock",
|
||||
"*.iml",
|
||||
"coverage",
|
||||
"*.min.js",
|
||||
"*.min.css",
|
||||
"__pycache__",
|
||||
".marketing",
|
||||
".env",
|
||||
".env.*",
|
||||
"*.jpg",
|
||||
"*.jpeg",
|
||||
"*.png",
|
||||
"*.gif",
|
||||
"*.bmp",
|
||||
"*.tiff",
|
||||
"*.ico",
|
||||
"*.svg",
|
||||
"*.webp",
|
||||
"*.psd",
|
||||
"*.ai",
|
||||
"*.eps",
|
||||
"*.indd",
|
||||
"*.raw",
|
||||
"*.cr2",
|
||||
"*.nef",
|
||||
"*.mp4",
|
||||
"*.mov",
|
||||
"*.avi",
|
||||
"*.wmv",
|
||||
"*.flv",
|
||||
"*.mkv",
|
||||
"*.webm",
|
||||
"*.m4v",
|
||||
"*.wfp",
|
||||
"*.prproj",
|
||||
"*.aep",
|
||||
"*.psb",
|
||||
"*.xcf",
|
||||
"*.sketch",
|
||||
"*.fig",
|
||||
"*.xd",
|
||||
"*.db",
|
||||
"*.sqlite",
|
||||
"*.sqlite3",
|
||||
"*.mdb",
|
||||
"*.accdb",
|
||||
"*.frm",
|
||||
"*.myd",
|
||||
"*.myi",
|
||||
"*.ibd",
|
||||
"*.dbf",
|
||||
"*.rdb",
|
||||
"*.aof",
|
||||
"*.pdb",
|
||||
"*.sdb",
|
||||
"*.s3db",
|
||||
"*.ddb",
|
||||
"*.db-shm",
|
||||
"*.db-wal",
|
||||
"*.sqlitedb",
|
||||
"*.sql.gz",
|
||||
"*.bak.sql",
|
||||
"dump.sql",
|
||||
"dump.rdb",
|
||||
"*.vsix",
|
||||
"*.jar",
|
||||
"*.war",
|
||||
"*.ear",
|
||||
"*.zip",
|
||||
"*.tar",
|
||||
"*.tar.gz",
|
||||
"*.tgz",
|
||||
"*.rar",
|
||||
"*.7z",
|
||||
"*.exe",
|
||||
"*.dll",
|
||||
"*.so",
|
||||
"*.dylib",
|
||||
"*.app",
|
||||
"*.dmg",
|
||||
"*.iso",
|
||||
"*.msi",
|
||||
"*.deb",
|
||||
"*.rpm",
|
||||
"*.apk",
|
||||
"*.aab",
|
||||
"*.ipa",
|
||||
"*.pkg",
|
||||
"*.nupkg",
|
||||
"*.snap",
|
||||
"*.whl",
|
||||
"*.gem",
|
||||
"*.pyc",
|
||||
"*.pyo",
|
||||
"*.pyd",
|
||||
"*.class",
|
||||
"*.o",
|
||||
"*.obj",
|
||||
"*.lib",
|
||||
"*.a",
|
||||
"*.map",
|
||||
".npmrc"
|
||||
],
|
||||
"default": {
|
||||
"default_prompt": "Enter your prompt here",
|
||||
"default_include_all_files": false,
|
||||
"default_include_entire_project_structure": true
|
||||
},
|
||||
"included_patterns": [
|
||||
"build.gradle",
|
||||
"settings.gradle",
|
||||
"gradle.properties",
|
||||
"pom.xml",
|
||||
"Makefile",
|
||||
"CMakeLists.txt",
|
||||
"package.json",
|
||||
"requirements.txt",
|
||||
"Pipfile",
|
||||
"Gemfile",
|
||||
"composer.json",
|
||||
".editorconfig",
|
||||
".eslintrc.json",
|
||||
".eslintrc.js",
|
||||
".prettierrc",
|
||||
".babelrc",
|
||||
".dockerignore",
|
||||
".gitattributes",
|
||||
".stylelintrc",
|
||||
".npmrc"
|
||||
]
|
||||
{
|
||||
"excluded_patterns": [
|
||||
".git",
|
||||
".gitignore",
|
||||
"gradle",
|
||||
"gradlew",
|
||||
"gradlew.*",
|
||||
"node_modules",
|
||||
".snapshots",
|
||||
".idea",
|
||||
".vscode",
|
||||
"*.log",
|
||||
"*.tmp",
|
||||
"target",
|
||||
"dist",
|
||||
"build",
|
||||
".DS_Store",
|
||||
"*.bak",
|
||||
"*.swp",
|
||||
"*.swo",
|
||||
"*.lock",
|
||||
"*.iml",
|
||||
"coverage",
|
||||
"*.min.js",
|
||||
"*.min.css",
|
||||
"__pycache__",
|
||||
".marketing",
|
||||
".env",
|
||||
".env.*",
|
||||
"*.jpg",
|
||||
"*.jpeg",
|
||||
"*.png",
|
||||
"*.gif",
|
||||
"*.bmp",
|
||||
"*.tiff",
|
||||
"*.ico",
|
||||
"*.svg",
|
||||
"*.webp",
|
||||
"*.psd",
|
||||
"*.ai",
|
||||
"*.eps",
|
||||
"*.indd",
|
||||
"*.raw",
|
||||
"*.cr2",
|
||||
"*.nef",
|
||||
"*.mp4",
|
||||
"*.mov",
|
||||
"*.avi",
|
||||
"*.wmv",
|
||||
"*.flv",
|
||||
"*.mkv",
|
||||
"*.webm",
|
||||
"*.m4v",
|
||||
"*.wfp",
|
||||
"*.prproj",
|
||||
"*.aep",
|
||||
"*.psb",
|
||||
"*.xcf",
|
||||
"*.sketch",
|
||||
"*.fig",
|
||||
"*.xd",
|
||||
"*.db",
|
||||
"*.sqlite",
|
||||
"*.sqlite3",
|
||||
"*.mdb",
|
||||
"*.accdb",
|
||||
"*.frm",
|
||||
"*.myd",
|
||||
"*.myi",
|
||||
"*.ibd",
|
||||
"*.dbf",
|
||||
"*.rdb",
|
||||
"*.aof",
|
||||
"*.pdb",
|
||||
"*.sdb",
|
||||
"*.s3db",
|
||||
"*.ddb",
|
||||
"*.db-shm",
|
||||
"*.db-wal",
|
||||
"*.sqlitedb",
|
||||
"*.sql.gz",
|
||||
"*.bak.sql",
|
||||
"dump.sql",
|
||||
"dump.rdb",
|
||||
"*.vsix",
|
||||
"*.jar",
|
||||
"*.war",
|
||||
"*.ear",
|
||||
"*.zip",
|
||||
"*.tar",
|
||||
"*.tar.gz",
|
||||
"*.tgz",
|
||||
"*.rar",
|
||||
"*.7z",
|
||||
"*.exe",
|
||||
"*.dll",
|
||||
"*.so",
|
||||
"*.dylib",
|
||||
"*.app",
|
||||
"*.dmg",
|
||||
"*.iso",
|
||||
"*.msi",
|
||||
"*.deb",
|
||||
"*.rpm",
|
||||
"*.apk",
|
||||
"*.aab",
|
||||
"*.ipa",
|
||||
"*.pkg",
|
||||
"*.nupkg",
|
||||
"*.snap",
|
||||
"*.whl",
|
||||
"*.gem",
|
||||
"*.pyc",
|
||||
"*.pyo",
|
||||
"*.pyd",
|
||||
"*.class",
|
||||
"*.o",
|
||||
"*.obj",
|
||||
"*.lib",
|
||||
"*.a",
|
||||
"*.map",
|
||||
".npmrc"
|
||||
],
|
||||
"default": {
|
||||
"default_prompt": "Enter your prompt here",
|
||||
"default_include_all_files": false,
|
||||
"default_include_entire_project_structure": true
|
||||
},
|
||||
"included_patterns": [
|
||||
"build.gradle",
|
||||
"settings.gradle",
|
||||
"gradle.properties",
|
||||
"pom.xml",
|
||||
"Makefile",
|
||||
"CMakeLists.txt",
|
||||
"package.json",
|
||||
"requirements.txt",
|
||||
"Pipfile",
|
||||
"Gemfile",
|
||||
"composer.json",
|
||||
".editorconfig",
|
||||
".eslintrc.json",
|
||||
".eslintrc.js",
|
||||
".prettierrc",
|
||||
".babelrc",
|
||||
".dockerignore",
|
||||
".gitattributes",
|
||||
".stylelintrc",
|
||||
".npmrc"
|
||||
]
|
||||
}
|
||||
+11
-11
@@ -1,11 +1,11 @@
|
||||
# Snapshots Directory
|
||||
|
||||
This directory contains snapshots of your code for AI interactions. Each snapshot is a markdown file that includes relevant code context and project structure information.
|
||||
|
||||
## What's included in snapshots?
|
||||
- Selected code files and their contents
|
||||
- Project structure (if enabled)
|
||||
- Your prompt/question for the AI
|
||||
|
||||
## Configuration
|
||||
You can customize snapshot behavior in `config.json`.
|
||||
# Snapshots Directory
|
||||
|
||||
This directory contains snapshots of your code for AI interactions. Each snapshot is a markdown file that includes relevant code context and project structure information.
|
||||
|
||||
## What's included in snapshots?
|
||||
- Selected code files and their contents
|
||||
- Project structure (if enabled)
|
||||
- Your prompt/question for the AI
|
||||
|
||||
## Configuration
|
||||
You can customize snapshot behavior in `config.json`.
|
||||
|
||||
+44
-44
@@ -1,44 +1,44 @@
|
||||
# Thank you for using Snapshots for AI
|
||||
|
||||
Thanks for using Snapshots for AI. We hope this tool has helped you solve a problem or two.
|
||||
|
||||
If you would like to support our work, please help us by considering the following offers and requests:
|
||||
|
||||
## Ways to Support
|
||||
|
||||
### Join the GBTI Network!!! 🙏🙏🙏
|
||||
The GBTI Network is a community of developers who are passionate about open source and community-driven development. Members enjoy access to exclussive tools, resources, a private MineCraft server, a listing in our members directory, co-op opportunities and more.
|
||||
|
||||
- Support our work by becoming a [GBTI Network member](https://gbti.network/membership/).
|
||||
|
||||
### Try out BugHerd 🐛
|
||||
BugHerd is a visual feedback and bug-tracking tool designed to streamline website development by enabling users to pin feedback directly onto web pages. This approach facilitates clear communication among clients, designers, developers, and project managers.
|
||||
|
||||
- Start your free trial with [BugHerd](https://partners.bugherd.com/55z6c8az8rvr) today.
|
||||
|
||||
### Hire Developers from Codeable 👥
|
||||
Codeable connects you with top-tier professionals skilled in frameworks and technologies such as Laravel, React, Django, Node, Vue.js, Angular, Ruby on Rails, and Node.js. Don't let the WordPress focus discourage you. Codeable experts do it all.
|
||||
|
||||
- Visit [Codeable](https://www.codeable.io/developers/?ref=z8h3e) to hire your next team member.
|
||||
|
||||
### Lead positive reviews on our marketplace listing ⭐⭐⭐⭐⭐
|
||||
- Rate us on [VSCode marketplace](https://marketplace.visualstudio.com/items?itemName=GBTI.snapshots-for-ai)
|
||||
- Review us on [Cursor marketplace](https://open-vsx.org/extension/GBTI/snapshots-for-ai)
|
||||
|
||||
### Star Our GitHub Repository ⭐
|
||||
- Star and watch our [repository](https://github.com/gbti-network/vscode-snapshots-for-ai)
|
||||
|
||||
### 📡 Stay Connected
|
||||
Follow us on your favorite platforms for updates, news, and community discussions:
|
||||
- **[Twitter/X](https://twitter.com/gbti_network)**
|
||||
- **[GitHub](https://github.com/gbti-network)**
|
||||
- **[YouTube](https://www.youtube.com/channel/UCh4FjB6r4oWQW-QFiwqv-UA)**
|
||||
- **[Dev.to](https://dev.to/gbti)**
|
||||
- **[Daily.dev](https://dly.to/zfCriM6JfRF)**
|
||||
- **[Hashnode](https://gbti.hashnode.dev/)**
|
||||
- **[Discord Community](https://gbti.network)**
|
||||
- **[Reddit Community](https://www.reddit.com/r/GBTI_network)**
|
||||
|
||||
---
|
||||
|
||||
Thank you for supporting open source software! 🙏
|
||||
# Thank you for using Snapshots for AI
|
||||
|
||||
Thanks for using Snapshots for AI. We hope this tool has helped you solve a problem or two.
|
||||
|
||||
If you would like to support our work, please help us by considering the following offers and requests:
|
||||
|
||||
## Ways to Support
|
||||
|
||||
### Join the GBTI Network!!! 🙏🙏🙏
|
||||
The GBTI Network is a community of developers who are passionate about open source and community-driven development. Members enjoy access to exclussive tools, resources, a private MineCraft server, a listing in our members directory, co-op opportunities and more.
|
||||
|
||||
- Support our work by becoming a [GBTI Network member](https://gbti.network/membership/).
|
||||
|
||||
### Try out BugHerd 🐛
|
||||
BugHerd is a visual feedback and bug-tracking tool designed to streamline website development by enabling users to pin feedback directly onto web pages. This approach facilitates clear communication among clients, designers, developers, and project managers.
|
||||
|
||||
- Start your free trial with [BugHerd](https://partners.bugherd.com/55z6c8az8rvr) today.
|
||||
|
||||
### Hire Developers from Codeable 👥
|
||||
Codeable connects you with top-tier professionals skilled in frameworks and technologies such as Laravel, React, Django, Node, Vue.js, Angular, Ruby on Rails, and Node.js. Don't let the WordPress focus discourage you. Codeable experts do it all.
|
||||
|
||||
- Visit [Codeable](https://www.codeable.io/developers/?ref=z8h3e) to hire your next team member.
|
||||
|
||||
### Lead positive reviews on our marketplace listing ⭐⭐⭐⭐⭐
|
||||
- Rate us on [VSCode marketplace](https://marketplace.visualstudio.com/items?itemName=GBTI.snapshots-for-ai)
|
||||
- Review us on [Cursor marketplace](https://open-vsx.org/extension/GBTI/snapshots-for-ai)
|
||||
|
||||
### Star Our GitHub Repository ⭐
|
||||
- Star and watch our [repository](https://github.com/gbti-network/vscode-snapshots-for-ai)
|
||||
|
||||
### 📡 Stay Connected
|
||||
Follow us on your favorite platforms for updates, news, and community discussions:
|
||||
- **[Twitter/X](https://twitter.com/gbti_network)**
|
||||
- **[GitHub](https://github.com/gbti-network)**
|
||||
- **[YouTube](https://www.youtube.com/channel/UCh4FjB6r4oWQW-QFiwqv-UA)**
|
||||
- **[Dev.to](https://dev.to/gbti)**
|
||||
- **[Daily.dev](https://dly.to/zfCriM6JfRF)**
|
||||
- **[Hashnode](https://gbti.hashnode.dev/)**
|
||||
- **[Discord Community](https://gbti.network)**
|
||||
- **[Reddit Community](https://www.reddit.com/r/GBTI_network)**
|
||||
|
||||
---
|
||||
|
||||
Thank you for supporting open source software! 🙏
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
# TMTC Client
|
||||
to interface the Host OBSW based on FSFW.
|
||||
|
||||
commands assuming the use of linux:
|
||||
## Creating a virtual environment
|
||||
|
||||
> :warning: Note that you may have to install python-venv before you can create a virtual environment. For a debian based system that would be `sudo apt install python3-venv`
|
||||
|
||||
> Note that when using the GUI in a virtualenv, you need to initialize the venv with system packages:
|
||||
```sh
|
||||
# Creates the virtual environment venv in the folder venv
|
||||
# Add the --system-site-packages flag only if you plan on using the GUI
|
||||
python3 -m venv venv --system-site-packages
|
||||
# Activate venv
|
||||
source ./venv/bin/activate
|
||||
```
|
||||
|
||||
2. Install all dependencies
|
||||
|
||||
|
||||
## install dependencies
|
||||
### a: (recommended) installing them as classic python packets
|
||||
```sh
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
### b: (not recommended) installing them in editable mode instead
|
||||
recommended if you actively develop the code in that repo.
|
||||
```sh
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
# Execute
|
||||
To run the `tmtc` Command Line Interface (CLI)
|
||||
```sh
|
||||
(venv) python main.py
|
||||
```
|
||||
In the program
|
||||
+ 1
|
||||
+ select `localhost`, `port` of your choice e.g. `7301`. After the first run, a tmtc_config.json is present and stores the preset data. The OBSW tells you on which port it is listening when starting up. 
|
||||
+ `:p` to print the command tree
|
||||
+ to execute commands from the `tmtc CLI` you look into the device tree and execute the respective command split by slashes.
|
||||
```sh
|
||||
test/ping # example to execute the ping command (PUS 17,1)
|
||||
```
|
||||
see below a screenshot to visualize the steps above.
|
||||

|
||||
|
||||
|
||||
In order for all of this to work, you must have your hosted OBSW running in another terminal listening to the correct port!
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 162 KiB |
@@ -1,103 +1,103 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Example client for the sat-rs example application"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
import tmtccmd
|
||||
from spacepackets.ecss import PusVerificator
|
||||
|
||||
from tmtccmd import ProcedureParamsWrapper
|
||||
from tmtccmd.core.base import BackendRequest
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tmtc import CcsdsTmHandler
|
||||
from tmtccmd.config import (
|
||||
default_json_path,
|
||||
SetupParams,
|
||||
params_to_procedure_conversion,
|
||||
)
|
||||
from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper
|
||||
from tmtccmd.logging import add_colorlog_console_logger
|
||||
from tmtccmd.logging.pus import (
|
||||
RegularTmtcLogWrapper,
|
||||
RawTmtcTimedLogWrapper,
|
||||
TimedLogWhen,
|
||||
)
|
||||
from spacepackets.seqcount import PusFileSeqCountProvider
|
||||
|
||||
|
||||
from pytmtc.config import SatrsConfigHook
|
||||
from pytmtc.pus_tc import TcHandler
|
||||
from pytmtc.pus_tm import PusHandler
|
||||
|
||||
_LOGGER = logging.getLogger()
|
||||
|
||||
|
||||
def main():
|
||||
add_colorlog_console_logger(_LOGGER)
|
||||
tmtccmd.init_printout(False)
|
||||
hook_obj = SatrsConfigHook(json_cfg_path=default_json_path())
|
||||
parser_wrapper = PreArgsParsingWrapper()
|
||||
parser_wrapper.create_default_parent_parser()
|
||||
parser_wrapper.create_default_parser()
|
||||
parser_wrapper.add_def_proc_args()
|
||||
params = SetupParams()
|
||||
post_args_wrapper = parser_wrapper.parse(hook_obj, params)
|
||||
proc_wrapper = ProcedureParamsWrapper()
|
||||
if post_args_wrapper.use_gui:
|
||||
post_args_wrapper.set_params_without_prompts(proc_wrapper)
|
||||
else:
|
||||
post_args_wrapper.set_params_with_prompts(proc_wrapper)
|
||||
setup_args = SetupWrapper(
|
||||
hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper
|
||||
)
|
||||
# Create console logger helper and file loggers
|
||||
tmtc_logger = RegularTmtcLogWrapper()
|
||||
file_logger = tmtc_logger.logger
|
||||
raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1)
|
||||
verificator = PusVerificator()
|
||||
verification_wrapper = VerificationWrapper(verificator, _LOGGER, file_logger)
|
||||
# Create primary TM handler and add it to the CCSDS Packet Handler
|
||||
tm_handler = PusHandler(file_logger, verification_wrapper, raw_logger)
|
||||
ccsds_handler = CcsdsTmHandler(generic_handler=tm_handler)
|
||||
# TODO: We could add the CFDP handler for the CFDP APID at a later stage.
|
||||
# ccsds_handler.add_apid_handler(tm_handler)
|
||||
|
||||
# Create TC handler
|
||||
seq_count_provider = PusFileSeqCountProvider()
|
||||
tc_handler = TcHandler(seq_count_provider, verification_wrapper)
|
||||
tmtccmd.setup(setup_args=setup_args)
|
||||
init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper)
|
||||
tmtc_backend = tmtccmd.create_default_tmtc_backend(
|
||||
setup_wrapper=setup_args,
|
||||
tm_handler=ccsds_handler,
|
||||
tc_handler=tc_handler,
|
||||
init_procedure=init_proc,
|
||||
)
|
||||
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj)
|
||||
try:
|
||||
while True:
|
||||
state = tmtc_backend.periodic_op(None)
|
||||
if state.request == BackendRequest.TERMINATION_NO_ERROR:
|
||||
tmtc_backend.close_com_if()
|
||||
sys.exit(0)
|
||||
elif state.request == BackendRequest.DELAY_IDLE:
|
||||
_LOGGER.info("TMTC Client in IDLE mode")
|
||||
time.sleep(3.0)
|
||||
elif state.request == BackendRequest.DELAY_LISTENER:
|
||||
time.sleep(0.8)
|
||||
elif state.request == BackendRequest.DELAY_CUSTOM:
|
||||
if state.next_delay.total_seconds() <= 0.4:
|
||||
time.sleep(state.next_delay.total_seconds())
|
||||
else:
|
||||
time.sleep(0.4)
|
||||
elif state.request == BackendRequest.CALL_NEXT:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
tmtc_backend.close_com_if()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
#!/usr/bin/env python3
|
||||
"""Example client for the sat-rs example application"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
import tmtccmd
|
||||
from spacepackets.ecss import PusVerificator
|
||||
|
||||
from tmtccmd import ProcedureParamsWrapper
|
||||
from tmtccmd.core.base import BackendRequest
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tmtc import CcsdsTmHandler
|
||||
from tmtccmd.config import (
|
||||
default_json_path,
|
||||
SetupParams,
|
||||
params_to_procedure_conversion,
|
||||
)
|
||||
from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper
|
||||
from tmtccmd.logging import add_colorlog_console_logger
|
||||
from tmtccmd.logging.pus import (
|
||||
RegularTmtcLogWrapper,
|
||||
RawTmtcTimedLogWrapper,
|
||||
TimedLogWhen,
|
||||
)
|
||||
from spacepackets.seqcount import PusFileSeqCountProvider
|
||||
|
||||
|
||||
from pytmtc.config import SatrsConfigHook
|
||||
from pytmtc.pus_tc import TcHandler
|
||||
from pytmtc.pus_tm import PusHandler
|
||||
|
||||
_LOGGER = logging.getLogger()
|
||||
|
||||
|
||||
def main():
|
||||
add_colorlog_console_logger(_LOGGER)
|
||||
tmtccmd.init_printout(False)
|
||||
hook_obj = SatrsConfigHook(json_cfg_path=default_json_path()) # this is our entry point
|
||||
parser_wrapper = PreArgsParsingWrapper()
|
||||
parser_wrapper.create_default_parent_parser()
|
||||
parser_wrapper.create_default_parser()
|
||||
parser_wrapper.add_def_proc_args()
|
||||
params = SetupParams()
|
||||
post_args_wrapper = parser_wrapper.parse(hook_obj, params)
|
||||
proc_wrapper = ProcedureParamsWrapper()
|
||||
if post_args_wrapper.use_gui:
|
||||
post_args_wrapper.set_params_without_prompts(proc_wrapper)
|
||||
else:
|
||||
post_args_wrapper.set_params_with_prompts(proc_wrapper)
|
||||
setup_args = SetupWrapper(
|
||||
hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper
|
||||
)
|
||||
# Create console logger helper and file loggers
|
||||
tmtc_logger = RegularTmtcLogWrapper()
|
||||
file_logger = tmtc_logger.logger
|
||||
raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1)
|
||||
verificator = PusVerificator()
|
||||
verification_wrapper = VerificationWrapper(verificator, _LOGGER, file_logger)
|
||||
# Create primary TM handler and add it to the CCSDS Packet Handler
|
||||
tm_handler = PusHandler(file_logger, verification_wrapper, raw_logger)
|
||||
ccsds_handler = CcsdsTmHandler(generic_handler=tm_handler)
|
||||
# TODO: We could add the CFDP handler for the CFDP APID at a later stage.
|
||||
# ccsds_handler.add_apid_handler(tm_handler)
|
||||
|
||||
# Create TC handler
|
||||
seq_count_provider = PusFileSeqCountProvider()
|
||||
tc_handler = TcHandler(seq_count_provider, verification_wrapper)
|
||||
tmtccmd.setup(setup_args=setup_args)
|
||||
init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper)
|
||||
tmtc_backend = tmtccmd.create_default_tmtc_backend(
|
||||
setup_wrapper=setup_args,
|
||||
tm_handler=ccsds_handler,
|
||||
tc_handler=tc_handler,
|
||||
init_procedure=init_proc,
|
||||
)
|
||||
tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj)
|
||||
try:
|
||||
while True:
|
||||
state = tmtc_backend.periodic_op(None)
|
||||
if state.request == BackendRequest.TERMINATION_NO_ERROR:
|
||||
tmtc_backend.close_com_if()
|
||||
sys.exit(0)
|
||||
elif state.request == BackendRequest.DELAY_IDLE:
|
||||
_LOGGER.info("TMTC Client in IDLE mode")
|
||||
time.sleep(3.0)
|
||||
elif state.request == BackendRequest.DELAY_LISTENER:
|
||||
time.sleep(0.8)
|
||||
elif state.request == BackendRequest.DELAY_CUSTOM:
|
||||
if state.next_delay.total_seconds() <= 0.4:
|
||||
time.sleep(state.next_delay.total_seconds())
|
||||
else:
|
||||
time.sleep(0.4)
|
||||
elif state.request == BackendRequest.CALL_NEXT:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
tmtc_backend.close_com_if()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
+28
-27
@@ -1,27 +1,28 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "pytmtc"
|
||||
description = "Python TMTC client for OPS-SAT"
|
||||
readme = "README.md"
|
||||
version = "0.1.0"
|
||||
requires-python = ">=3.8"
|
||||
authors = [
|
||||
{name = "Robin Mueller", email = "robin.mueller.m@gmail.com"},
|
||||
]
|
||||
dependencies = [
|
||||
"tmtccmd~=8.1",
|
||||
"pydantic~=2.7"
|
||||
]
|
||||
|
||||
[tool.setuptools.packages]
|
||||
find = {}
|
||||
|
||||
[tool.ruff]
|
||||
extend-exclude = ["archive"]
|
||||
[tool.ruff.lint]
|
||||
ignore = ["E501"]
|
||||
[tool.ruff.lint.extend-per-file-ignores]
|
||||
"__init__.py" = ["F401"]
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "pytmtc"
|
||||
description = "Python TMTC client for Host Build"
|
||||
readme = "README.md"
|
||||
version = "0.1.0"
|
||||
requires-python = ">=3.8"
|
||||
authors = [
|
||||
{name = "Robin Mueller", email = "robin.mueller.m@gmail.com"},
|
||||
]
|
||||
dependencies = [
|
||||
"tmtccmd~=8.2",
|
||||
"spacepackets~=0.28.0",
|
||||
"pydantic~=2.7"
|
||||
]
|
||||
|
||||
[tool.setuptools.packages]
|
||||
find = {}
|
||||
|
||||
[tool.ruff]
|
||||
extend-exclude = ["archive"]
|
||||
[tool.ruff.lint]
|
||||
ignore = ["E501"]
|
||||
[tool.ruff.lint.extend-per-file-ignores]
|
||||
"__init__.py" = ["F401"]
|
||||
|
||||
+11
-11
@@ -1,11 +1,11 @@
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
|
||||
|
||||
def create_acs_node(mode_node: CmdTreeNode, hk_node: CmdTreeNode) -> CmdTreeNode:
|
||||
acs_node = CmdTreeNode("acs", "ACS Subsystem Node")
|
||||
mgm_node = CmdTreeNode("mgms", "MGM devices node")
|
||||
mgm_node.add_child(mode_node)
|
||||
mgm_node.add_child(hk_node)
|
||||
|
||||
acs_node.add_child(mgm_node)
|
||||
return acs_node
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
|
||||
|
||||
def create_acs_node(mode_node: CmdTreeNode, hk_node: CmdTreeNode) -> CmdTreeNode:
|
||||
acs_node = CmdTreeNode("acs", "ACS Subsystem Node")
|
||||
mgm_node = CmdTreeNode("mgms", "MGM devices node")
|
||||
mgm_node.add_child(mode_node)
|
||||
mgm_node.add_child(hk_node)
|
||||
|
||||
acs_node.add_child(mgm_node)
|
||||
return acs_node
|
||||
|
||||
+45
-45
@@ -1,45 +1,45 @@
|
||||
import logging
|
||||
import struct
|
||||
import enum
|
||||
from typing import List
|
||||
from spacepackets.ecss import PusTm
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
|
||||
from pytmtc.common import AcsId, Apid
|
||||
from pytmtc.hk_common import create_request_one_shot_hk_cmd
|
||||
from pytmtc.mode import handle_set_mode_cmd
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SetId(enum.IntEnum):
|
||||
SENSOR_SET = 0
|
||||
|
||||
|
||||
def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]):
|
||||
assert len(cmd_path) >= 3
|
||||
if cmd_path[2] == "hk":
|
||||
if cmd_path[3] == "one_shot_hk":
|
||||
q.add_log_cmd("Sending HK one shot request")
|
||||
q.add_pus_tc(
|
||||
create_request_one_shot_hk_cmd(Apid.ACS, AcsId.MGM_0, SetId.SENSOR_SET)
|
||||
)
|
||||
|
||||
if cmd_path[2] == "mode":
|
||||
if cmd_path[3] == "set_mode":
|
||||
handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0)
|
||||
|
||||
|
||||
def handle_mgm_hk_report(pus_tm: PusTm, set_id: int, hk_data: bytes):
|
||||
if set_id == SetId.SENSOR_SET:
|
||||
if len(hk_data) != 13:
|
||||
raise ValueError(f"invalid HK data length, expected 13, got {len(hk_data)}")
|
||||
data_valid = hk_data[0]
|
||||
mgm_x = struct.unpack("!f", hk_data[1:5])[0]
|
||||
mgm_y = struct.unpack("!f", hk_data[5:9])[0]
|
||||
mgm_z = struct.unpack("!f", hk_data[9:13])[0]
|
||||
_LOGGER.info(
|
||||
f"received MGM HK set in uT: Valid {data_valid} X {mgm_x} Y {mgm_y} Z {mgm_z}"
|
||||
)
|
||||
pass
|
||||
import logging
|
||||
import struct
|
||||
import enum
|
||||
from typing import List
|
||||
from spacepackets.ecss import PusTm
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
|
||||
from pytmtc.common import AcsId, Apid
|
||||
from pytmtc.hk_common import create_request_one_shot_hk_cmd
|
||||
from pytmtc.mode import handle_set_mode_cmd
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SetId(enum.IntEnum):
|
||||
SENSOR_SET = 0
|
||||
|
||||
|
||||
def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]):
|
||||
assert len(cmd_path) >= 3
|
||||
if cmd_path[2] == "hk":
|
||||
if cmd_path[3] == "one_shot_hk":
|
||||
q.add_log_cmd("Sending HK one shot request")
|
||||
q.add_pus_tc(
|
||||
create_request_one_shot_hk_cmd(Apid.ACS, AcsId.MGM_0, SetId.SENSOR_SET)
|
||||
)
|
||||
|
||||
if cmd_path[2] == "mode":
|
||||
if cmd_path[3] == "set_mode":
|
||||
handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0)
|
||||
|
||||
|
||||
def handle_mgm_hk_report(pus_tm: PusTm, set_id: int, hk_data: bytes):
|
||||
if set_id == SetId.SENSOR_SET:
|
||||
if len(hk_data) != 13:
|
||||
raise ValueError(f"invalid HK data length, expected 13, got {len(hk_data)}")
|
||||
data_valid = hk_data[0]
|
||||
mgm_x = struct.unpack("!f", hk_data[1:5])[0]
|
||||
mgm_y = struct.unpack("!f", hk_data[5:9])[0]
|
||||
mgm_z = struct.unpack("!f", hk_data[9:13])[0]
|
||||
_LOGGER.info(
|
||||
f"received MGM HK set in uT: Valid {data_valid} X {mgm_x} Y {mgm_y} Z {mgm_z}"
|
||||
)
|
||||
pass
|
||||
|
||||
+55
-55
@@ -1,55 +1,55 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import enum
|
||||
import struct
|
||||
|
||||
|
||||
class Apid(enum.IntEnum):
|
||||
SCHED = 1
|
||||
GENERIC_PUS = 2
|
||||
ACS = 3
|
||||
CFDP = 4
|
||||
TMTC = 5
|
||||
|
||||
|
||||
class EventSeverity(enum.IntEnum):
|
||||
INFO = 0
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class EventU32:
|
||||
severity: EventSeverity
|
||||
group_id: int
|
||||
unique_id: int
|
||||
|
||||
@classmethod
|
||||
def unpack(cls, data: bytes) -> EventU32:
|
||||
if len(data) < 4:
|
||||
raise ValueError("passed data too short")
|
||||
event_raw = struct.unpack("!I", data[0:4])[0]
|
||||
return cls(
|
||||
severity=EventSeverity((event_raw >> 30) & 0b11),
|
||||
group_id=(event_raw >> 16) & 0x3FFF,
|
||||
unique_id=event_raw & 0xFFFF,
|
||||
)
|
||||
|
||||
|
||||
class AcsId(enum.IntEnum):
|
||||
SUBSYSTEM = 1
|
||||
MGM_ASSEMBLY = 2
|
||||
MGM_0 = 3
|
||||
MGM_1 = 4
|
||||
|
||||
|
||||
class AcsHkIds(enum.IntEnum):
|
||||
MGM_SET = 1
|
||||
|
||||
|
||||
def make_addressable_id(target_id: int, unique_id: int) -> bytes:
|
||||
byte_string = bytearray(struct.pack("!I", target_id))
|
||||
byte_string.extend(struct.pack("!I", unique_id))
|
||||
return byte_string
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import enum
|
||||
import struct
|
||||
|
||||
|
||||
class Apid(enum.IntEnum):
|
||||
SCHED = 1
|
||||
GENERIC_PUS = 0xEF # Test Service 17
|
||||
ACS = 3
|
||||
CFDP = 4
|
||||
TMTC = 5
|
||||
|
||||
|
||||
class EventSeverity(enum.IntEnum):
|
||||
INFO = 0
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class EventU32:
|
||||
severity: EventSeverity
|
||||
group_id: int
|
||||
unique_id: int
|
||||
|
||||
@classmethod
|
||||
def unpack(cls, data: bytes) -> EventU32:
|
||||
if len(data) < 4:
|
||||
raise ValueError("passed data too short")
|
||||
event_raw = struct.unpack("!I", data[0:4])[0]
|
||||
return cls(
|
||||
severity=EventSeverity((event_raw >> 30) & 0b11),
|
||||
group_id=(event_raw >> 16) & 0x3FFF,
|
||||
unique_id=event_raw & 0xFFFF,
|
||||
)
|
||||
|
||||
|
||||
class AcsId(enum.IntEnum):
|
||||
SUBSYSTEM = 1
|
||||
MGM_ASSEMBLY = 2
|
||||
MGM_0 = 3
|
||||
MGM_1 = 4
|
||||
|
||||
|
||||
class AcsHkIds(enum.IntEnum):
|
||||
MGM_SET = 1
|
||||
|
||||
|
||||
def make_addressable_id(target_id: int, unique_id: int) -> bytes:
|
||||
byte_string = bytearray(struct.pack("!I", target_id))
|
||||
byte_string.extend(struct.pack("!I", unique_id))
|
||||
return byte_string
|
||||
|
||||
+47
-47
@@ -1,47 +1,47 @@
|
||||
from typing import Optional
|
||||
from prompt_toolkit.history import FileHistory, History
|
||||
from spacepackets.ccsds import PacketId, PacketType
|
||||
from tmtccmd import HookBase
|
||||
from tmtccmd.com import ComInterface
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
from tmtccmd.util.obj_id import ObjectIdDictT
|
||||
|
||||
from pytmtc.common import Apid
|
||||
from pytmtc.pus_tc import create_cmd_definition_tree
|
||||
|
||||
|
||||
class SatrsConfigHook(HookBase):
|
||||
def __init__(self, json_cfg_path: str):
|
||||
super().__init__(json_cfg_path)
|
||||
|
||||
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
|
||||
from tmtccmd.config.com import (
|
||||
create_com_interface_default,
|
||||
create_com_interface_cfg_default,
|
||||
)
|
||||
|
||||
assert self.cfg_path is not None
|
||||
packet_id_list = []
|
||||
for apid in Apid:
|
||||
packet_id_list.append(PacketId(PacketType.TM, True, apid))
|
||||
cfg = create_com_interface_cfg_default(
|
||||
com_if_key=com_if_key,
|
||||
json_cfg_path=self.cfg_path,
|
||||
space_packet_ids=packet_id_list,
|
||||
)
|
||||
assert cfg is not None
|
||||
return create_com_interface_default(cfg)
|
||||
|
||||
def get_command_definitions(self) -> CmdTreeNode:
|
||||
"""This function should return the root node of the command definition tree."""
|
||||
return create_cmd_definition_tree()
|
||||
|
||||
def get_cmd_history(self) -> Optional[History]:
|
||||
"""Optionlly return a history class for the past command paths which will be used
|
||||
when prompting a command path from the user in CLI mode."""
|
||||
return FileHistory(".tmtc-history.txt")
|
||||
|
||||
def get_object_ids(self) -> ObjectIdDictT:
|
||||
from tmtccmd.config.objects import get_core_object_ids
|
||||
|
||||
return get_core_object_ids()
|
||||
from typing import Optional
|
||||
from prompt_toolkit.history import FileHistory, History
|
||||
from spacepackets.ccsds import PacketId, PacketType
|
||||
from tmtccmd import HookBase
|
||||
from tmtccmd.com import ComInterface
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
from tmtccmd.util.obj_id import ObjectIdDictT
|
||||
|
||||
from pytmtc.common import Apid
|
||||
from pytmtc.pus_tc import create_cmd_definition_tree
|
||||
|
||||
|
||||
class SatrsConfigHook(HookBase):
|
||||
def __init__(self, json_cfg_path: str):
|
||||
super().__init__(json_cfg_path)
|
||||
|
||||
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
|
||||
from tmtccmd.config.com import (
|
||||
create_com_interface_default,
|
||||
create_com_interface_cfg_default,
|
||||
)
|
||||
|
||||
assert self.cfg_path is not None
|
||||
packet_id_list = []
|
||||
for apid in Apid:
|
||||
packet_id_list.append(PacketId(PacketType.TM, True, apid))
|
||||
cfg = create_com_interface_cfg_default(
|
||||
com_if_key=com_if_key,
|
||||
json_cfg_path=self.cfg_path,
|
||||
space_packet_ids=packet_id_list,
|
||||
)
|
||||
assert cfg is not None
|
||||
return create_com_interface_default(cfg)
|
||||
|
||||
def get_command_definitions(self) -> CmdTreeNode:
|
||||
"""This function should return the root node of the command definition tree."""
|
||||
return create_cmd_definition_tree() # this is the entry point for your TC definition
|
||||
|
||||
def get_cmd_history(self) -> Optional[History]:
|
||||
"""Optionlly return a history class for the past command paths which will be used
|
||||
when prompting a command path from the user in CLI mode."""
|
||||
return FileHistory(".tmtc-history.txt")
|
||||
|
||||
def get_object_ids(self) -> ObjectIdDictT:
|
||||
from tmtccmd.config.objects import get_core_object_ids
|
||||
|
||||
return get_core_object_ids()
|
||||
|
||||
+42
-42
@@ -1,42 +1,42 @@
|
||||
import logging
|
||||
import struct
|
||||
from spacepackets.ecss.pus_3_hk import Subservice
|
||||
from spacepackets.ecss import PusTm
|
||||
|
||||
from pytmtc.common import AcsId, Apid
|
||||
from pytmtc.acs.mgms import handle_mgm_hk_report
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def handle_hk_packet(pus_tm: PusTm):
|
||||
if len(pus_tm.source_data) < 4:
|
||||
raise ValueError("no unique ID in HK packet")
|
||||
unique_id = struct.unpack("!I", pus_tm.source_data[:4])[0]
|
||||
if (
|
||||
pus_tm.subservice == Subservice.TM_HK_REPORT
|
||||
or pus_tm.subservice == Subservice.TM_DIAGNOSTICS_REPORT
|
||||
):
|
||||
if len(pus_tm.source_data) < 8:
|
||||
raise ValueError("no set ID in HK packet")
|
||||
set_id = struct.unpack("!I", pus_tm.source_data[4:8])[0]
|
||||
handle_hk_report(pus_tm, unique_id, set_id)
|
||||
_LOGGER.warning(
|
||||
f"handling for HK packet with subservice {pus_tm.subservice} not implemented yet"
|
||||
)
|
||||
|
||||
|
||||
def handle_hk_report(pus_tm: PusTm, unique_id: int, set_id: int):
|
||||
hk_data = pus_tm.source_data[8:]
|
||||
if pus_tm.apid == Apid.ACS:
|
||||
if unique_id == AcsId.MGM_0:
|
||||
handle_mgm_hk_report(pus_tm, set_id, hk_data)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
f"handling for HK report with unique ID {unique_id} not implemented yet"
|
||||
)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
f"handling for HK report with apid {pus_tm.apid} not implemented yet"
|
||||
)
|
||||
import logging
|
||||
import struct
|
||||
from spacepackets.ecss.pus_3_hk import Subservice
|
||||
from spacepackets.ecss import PusTm
|
||||
|
||||
from pytmtc.common import AcsId, Apid
|
||||
from pytmtc.acs.mgms import handle_mgm_hk_report
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def handle_hk_packet(pus_tm: PusTm):
|
||||
if len(pus_tm.source_data) < 4:
|
||||
raise ValueError("no unique ID in HK packet")
|
||||
unique_id = struct.unpack("!I", pus_tm.source_data[:4])[0]
|
||||
if (
|
||||
pus_tm.subservice == Subservice.TM_HK_REPORT
|
||||
or pus_tm.subservice == Subservice.TM_DIAGNOSTICS_REPORT
|
||||
):
|
||||
if len(pus_tm.source_data) < 8:
|
||||
raise ValueError("no set ID in HK packet")
|
||||
set_id = struct.unpack("!I", pus_tm.source_data[4:8])[0]
|
||||
handle_hk_report(pus_tm, unique_id, set_id)
|
||||
_LOGGER.warning(
|
||||
f"handling for HK packet with subservice {pus_tm.subservice} not implemented yet"
|
||||
)
|
||||
|
||||
|
||||
def handle_hk_report(pus_tm: PusTm, unique_id: int, set_id: int):
|
||||
hk_data = pus_tm.source_data[8:]
|
||||
if pus_tm.apid == Apid.ACS:
|
||||
if unique_id == AcsId.MGM_0:
|
||||
handle_mgm_hk_report(pus_tm, set_id, hk_data)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
f"handling for HK report with unique ID {unique_id} not implemented yet"
|
||||
)
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
f"handling for HK report with apid {pus_tm.apid} not implemented yet"
|
||||
)
|
||||
|
||||
+16
-16
@@ -1,16 +1,16 @@
|
||||
import struct
|
||||
|
||||
from spacepackets.ecss import PusService, PusTc
|
||||
from spacepackets.ecss.pus_3_hk import Subservice
|
||||
|
||||
|
||||
def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc:
|
||||
app_data = bytearray()
|
||||
app_data.extend(struct.pack("!I", unique_id))
|
||||
app_data.extend(struct.pack("!I", set_id))
|
||||
return PusTc(
|
||||
service=PusService.S3_HOUSEKEEPING,
|
||||
subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT,
|
||||
apid=apid,
|
||||
app_data=app_data,
|
||||
)
|
||||
import struct
|
||||
|
||||
from spacepackets.ecss import PusService, PusTc
|
||||
from spacepackets.ecss.pus_3_hk import Subservice
|
||||
|
||||
|
||||
def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc:
|
||||
app_data = bytearray()
|
||||
app_data.extend(struct.pack("!I", unique_id))
|
||||
app_data.extend(struct.pack("!I", set_id))
|
||||
return PusTc(
|
||||
service=PusService.S3_HOUSEKEEPING,
|
||||
subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT,
|
||||
apid=apid,
|
||||
app_data=app_data,
|
||||
)
|
||||
|
||||
+31
-31
@@ -1,31 +1,31 @@
|
||||
import struct
|
||||
from spacepackets.ecss import PusTc
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
|
||||
|
||||
def create_set_mode_cmd(apid: int, unique_id: int, mode: int, submode: int) -> PusTc:
|
||||
app_data = bytearray()
|
||||
app_data.extend(struct.pack("!I", unique_id))
|
||||
app_data.extend(struct.pack("!I", mode))
|
||||
app_data.extend(struct.pack("!H", submode))
|
||||
return PusTc(
|
||||
service=200,
|
||||
subservice=Subservice.TC_MODE_COMMAND,
|
||||
apid=apid,
|
||||
app_data=app_data,
|
||||
)
|
||||
|
||||
|
||||
def handle_set_mode_cmd(
|
||||
q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int
|
||||
):
|
||||
if mode_str == "off":
|
||||
q.add_log_cmd(f"Sending Mode OFF to {target_str}")
|
||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0))
|
||||
elif mode_str == "on":
|
||||
q.add_log_cmd(f"Sending Mode ON to {target_str}")
|
||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0))
|
||||
elif mode_str == "normal":
|
||||
q.add_log_cmd(f"Sending Mode NORMAL to {target_str}")
|
||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0))
|
||||
import struct
|
||||
from spacepackets.ecss import PusTc
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
|
||||
|
||||
def create_set_mode_cmd(apid: int, unique_id: int, mode: int, submode: int) -> PusTc:
|
||||
app_data = bytearray()
|
||||
app_data.extend(struct.pack("!I", unique_id))
|
||||
app_data.extend(struct.pack("!I", mode))
|
||||
app_data.extend(struct.pack("!H", submode))
|
||||
return PusTc(
|
||||
service=200,
|
||||
subservice=Subservice.TC_MODE_COMMAND,
|
||||
apid=apid,
|
||||
app_data=app_data,
|
||||
)
|
||||
|
||||
|
||||
def handle_set_mode_cmd(
|
||||
q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int
|
||||
):
|
||||
if mode_str == "off":
|
||||
q.add_log_cmd(f"Sending Mode OFF to {target_str}")
|
||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0))
|
||||
elif mode_str == "on":
|
||||
q.add_log_cmd(f"Sending Mode ON to {target_str}")
|
||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0))
|
||||
elif mode_str == "normal":
|
||||
q.add_log_cmd(f"Sending Mode NORMAL to {target_str}")
|
||||
q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0))
|
||||
|
||||
+154
-143
@@ -1,143 +1,154 @@
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from spacepackets.ccsds import CdsShortTimestamp
|
||||
from spacepackets.ecss import PusTelecommand
|
||||
from spacepackets.seqcount import FileSeqCountProvider
|
||||
from tmtccmd import ProcedureWrapper, TcHandlerBase
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tmtc import (
|
||||
DefaultPusQueueHelper,
|
||||
FeedWrapper,
|
||||
QueueWrapper,
|
||||
SendCbParams,
|
||||
TcProcedureType,
|
||||
TcQueueEntryType,
|
||||
)
|
||||
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
||||
|
||||
from pytmtc.acs import create_acs_node
|
||||
from pytmtc.common import Apid
|
||||
from pytmtc.acs.mgms import create_mgm_cmds
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TcHandler(TcHandlerBase):
|
||||
def __init__(
|
||||
self,
|
||||
seq_count_provider: FileSeqCountProvider,
|
||||
verif_wrapper: VerificationWrapper,
|
||||
):
|
||||
super(TcHandler, self).__init__()
|
||||
self.seq_count_provider = seq_count_provider
|
||||
self.verif_wrapper = verif_wrapper
|
||||
self.queue_helper = DefaultPusQueueHelper(
|
||||
queue_wrapper=QueueWrapper.empty(),
|
||||
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
|
||||
seq_cnt_provider=seq_count_provider,
|
||||
pus_verificator=self.verif_wrapper.pus_verificator,
|
||||
default_pus_apid=None,
|
||||
)
|
||||
|
||||
def send_cb(self, send_params: SendCbParams):
|
||||
entry_helper = send_params.entry
|
||||
if entry_helper.is_tc:
|
||||
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
|
||||
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
|
||||
raw_tc = pus_tc_wrapper.pus_tc.pack()
|
||||
_LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
|
||||
send_params.com_if.send(raw_tc)
|
||||
elif entry_helper.entry_type == TcQueueEntryType.LOG:
|
||||
log_entry = entry_helper.to_log_entry()
|
||||
_LOGGER.info(log_entry.log_str)
|
||||
|
||||
def queue_finished_cb(self, info: ProcedureWrapper):
|
||||
if info.proc_type == TcProcedureType.TREE_COMMANDING:
|
||||
def_proc = info.to_tree_commanding_procedure()
|
||||
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")
|
||||
|
||||
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
|
||||
q = self.queue_helper
|
||||
q.queue_wrapper = wrapper.queue_wrapper
|
||||
if info.proc_type == TcProcedureType.TREE_COMMANDING:
|
||||
def_proc = info.to_tree_commanding_procedure()
|
||||
assert def_proc.cmd_path is not None
|
||||
pack_pus_telecommands(q, def_proc.cmd_path)
|
||||
|
||||
|
||||
def create_cmd_definition_tree() -> CmdTreeNode:
|
||||
root_node = CmdTreeNode.root_node()
|
||||
|
||||
hk_node = CmdTreeNode("hk", "Housekeeping Node", hide_children_for_print=True)
|
||||
hk_node.add_child(CmdTreeNode("one_shot_hk", "Request One Shot HK set"))
|
||||
hk_node.add_child(
|
||||
CmdTreeNode("enable", "Enable periodic housekeeping data generation")
|
||||
)
|
||||
hk_node.add_child(
|
||||
CmdTreeNode("disable", "Disable periodic housekeeping data generation")
|
||||
)
|
||||
|
||||
mode_node = CmdTreeNode("mode", "Mode Node", hide_children_for_print=True)
|
||||
set_mode_node = CmdTreeNode(
|
||||
"set_mode", "Set Node", hide_children_which_are_leaves=True
|
||||
)
|
||||
set_mode_node.add_child(CmdTreeNode("off", "Set OFF Mode"))
|
||||
set_mode_node.add_child(CmdTreeNode("on", "Set ON Mode"))
|
||||
set_mode_node.add_child(CmdTreeNode("normal", "Set NORMAL Mode"))
|
||||
mode_node.add_child(set_mode_node)
|
||||
mode_node.add_child(CmdTreeNode("read_mode", "Read Mode"))
|
||||
|
||||
test_node = CmdTreeNode("test", "Test Node")
|
||||
test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC"))
|
||||
test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event"))
|
||||
root_node.add_child(test_node)
|
||||
|
||||
scheduler_node = CmdTreeNode("scheduler", "Scheduler Node")
|
||||
scheduler_node.add_child(
|
||||
CmdTreeNode(
|
||||
"schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds"
|
||||
)
|
||||
)
|
||||
root_node.add_child(scheduler_node)
|
||||
root_node.add_child(create_acs_node(mode_node, hk_node))
|
||||
return root_node
|
||||
|
||||
|
||||
def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
||||
# It should always be at least the root path "/", so we split of the empty portion left of it.
|
||||
cmd_path_list = cmd_path.split("/")[1:]
|
||||
if len(cmd_path_list) == 0:
|
||||
_LOGGER.warning("empty command path")
|
||||
return
|
||||
if cmd_path_list[0] == "test":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "ping":
|
||||
q.add_log_cmd("Sending PUS ping telecommand")
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1)
|
||||
)
|
||||
elif cmd_path_list[1] == "trigger_event":
|
||||
q.add_log_cmd("Triggering test event")
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128)
|
||||
)
|
||||
if cmd_path_list[0] == "scheduler":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "schedule_ping_10_secs_ahead":
|
||||
q.add_log_cmd("Sending PUS scheduled TC telecommand")
|
||||
crt_time = CdsShortTimestamp.from_now()
|
||||
time_stamp = crt_time + datetime.timedelta(seconds=10)
|
||||
time_stamp = time_stamp.pack()
|
||||
return q.add_pus_tc(
|
||||
create_time_tagged_cmd(
|
||||
time_stamp,
|
||||
PusTelecommand(service=17, subservice=1),
|
||||
apid=Apid.SCHED,
|
||||
)
|
||||
)
|
||||
if cmd_path_list[0] == "acs":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "mgms":
|
||||
create_mgm_cmds(q, cmd_path_list)
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from spacepackets.ccsds import CdsShortTimestamp
|
||||
from spacepackets.ecss import PusTelecommand
|
||||
from spacepackets.seqcount import FileSeqCountProvider
|
||||
from tmtccmd import ProcedureWrapper, TcHandlerBase
|
||||
from tmtccmd.config import CmdTreeNode
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tmtc import (
|
||||
DefaultPusQueueHelper,
|
||||
FeedWrapper,
|
||||
QueueWrapper,
|
||||
SendCbParams,
|
||||
TcProcedureType,
|
||||
TcQueueEntryType,
|
||||
)
|
||||
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
||||
|
||||
from pytmtc.acs import create_acs_node
|
||||
from pytmtc.common import Apid
|
||||
from pytmtc.acs.mgms import create_mgm_cmds
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TcHandler(TcHandlerBase):
|
||||
def __init__(
|
||||
self,
|
||||
seq_count_provider: FileSeqCountProvider,
|
||||
verif_wrapper: VerificationWrapper,
|
||||
):
|
||||
super(TcHandler, self).__init__()
|
||||
self.seq_count_provider = seq_count_provider
|
||||
self.verif_wrapper = verif_wrapper
|
||||
self.queue_helper = DefaultPusQueueHelper(
|
||||
queue_wrapper=QueueWrapper.empty(),
|
||||
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
|
||||
seq_cnt_provider=seq_count_provider,
|
||||
pus_verificator=self.verif_wrapper.pus_verificator,
|
||||
default_pus_apid=None,
|
||||
)
|
||||
|
||||
def send_cb(self, send_params: SendCbParams):
|
||||
entry_helper = send_params.entry
|
||||
if entry_helper.is_tc:
|
||||
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
|
||||
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
|
||||
raw_tc = pus_tc_wrapper.pus_tc.pack()
|
||||
_LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
|
||||
send_params.com_if.send(raw_tc)
|
||||
elif entry_helper.entry_type == TcQueueEntryType.LOG:
|
||||
log_entry = entry_helper.to_log_entry()
|
||||
_LOGGER.info(log_entry.log_str)
|
||||
|
||||
def queue_finished_cb(self, info: ProcedureWrapper):
|
||||
if info.proc_type == TcProcedureType.TREE_COMMANDING:
|
||||
def_proc = info.to_tree_commanding_procedure()
|
||||
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")
|
||||
|
||||
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
|
||||
q = self.queue_helper
|
||||
q.queue_wrapper = wrapper.queue_wrapper
|
||||
if info.proc_type == TcProcedureType.TREE_COMMANDING:
|
||||
def_proc = info.to_tree_commanding_procedure()
|
||||
assert def_proc.cmd_path is not None
|
||||
pack_pus_telecommands(q, def_proc.cmd_path)
|
||||
|
||||
|
||||
def create_cmd_definition_tree() -> CmdTreeNode:
|
||||
root_node = CmdTreeNode.root_node()
|
||||
|
||||
# dummy stuff for testing
|
||||
test_node = CmdTreeNode("test", "Test Node")
|
||||
test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC"))
|
||||
test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event"))
|
||||
root_node.add_child(test_node)
|
||||
|
||||
# here we can see the different interfaces
|
||||
# housekeeping (PUS 3)
|
||||
hk_node = CmdTreeNode("hk", "Housekeeping Node", hide_children_for_print=True)
|
||||
hk_node.add_child(CmdTreeNode("one_shot_hk", "Request One Shot HK set"))
|
||||
hk_node.add_child(
|
||||
CmdTreeNode("enable", "Enable periodic housekeeping data generation")
|
||||
)
|
||||
hk_node.add_child(
|
||||
CmdTreeNode("disable", "Disable periodic housekeeping data generation")
|
||||
)
|
||||
|
||||
# mode (PUS 200)
|
||||
mode_node = CmdTreeNode("mode", "Mode Node", hide_children_for_print=True)
|
||||
# create the command to set any mode
|
||||
set_mode_node = CmdTreeNode(
|
||||
"set_mode", "Set Node", hide_children_which_are_leaves=True
|
||||
)
|
||||
# adding the different modes
|
||||
set_mode_node.add_child(CmdTreeNode("off", "Set OFF Mode"))
|
||||
set_mode_node.add_child(CmdTreeNode("on", "Set ON Mode"))
|
||||
set_mode_node.add_child(CmdTreeNode("normal", "Set NORMAL Mode"))
|
||||
# after all mode children are added, the set_mode_node can be added to the parent: mode_node
|
||||
mode_node.add_child(set_mode_node) # system mode commands
|
||||
mode_node.add_child(CmdTreeNode("read_mode", "Read Mode")) # read the current system mode
|
||||
|
||||
|
||||
|
||||
root_node.add_child(create_acs_node(mode_node, hk_node)) # we need to refer the hk & mode nodes to different subsystems or devices, as we want to use them to control a subsystem or device.
|
||||
|
||||
# misc
|
||||
scheduler_node = CmdTreeNode("scheduler", "Scheduler Node")
|
||||
scheduler_node.add_child(
|
||||
CmdTreeNode(
|
||||
"schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds"
|
||||
)
|
||||
)
|
||||
root_node.add_child(scheduler_node)
|
||||
return root_node
|
||||
|
||||
|
||||
def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str):
|
||||
# It should always be at least the root path "/", so we split of the empty portion left of it.
|
||||
cmd_path_list = cmd_path.split("/")[1:]
|
||||
if len(cmd_path_list) == 0:
|
||||
_LOGGER.warning("empty command path")
|
||||
return
|
||||
if cmd_path_list[0] == "test":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "ping":
|
||||
q.add_log_cmd("Sending PUS ping telecommand")
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1)
|
||||
)
|
||||
elif cmd_path_list[1] == "trigger_event":
|
||||
q.add_log_cmd("Triggering test event")
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128)
|
||||
)
|
||||
if cmd_path_list[0] == "scheduler":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "schedule_ping_10_secs_ahead":
|
||||
q.add_log_cmd("Sending PUS scheduled TC telecommand")
|
||||
crt_time = CdsShortTimestamp.from_now()
|
||||
time_stamp = crt_time + datetime.timedelta(seconds=10)
|
||||
time_stamp = time_stamp.pack()
|
||||
return q.add_pus_tc(
|
||||
create_time_tagged_cmd(
|
||||
time_stamp,
|
||||
PusTelecommand(service=17, subservice=1),
|
||||
apid=Apid.SCHED,
|
||||
)
|
||||
)
|
||||
if cmd_path_list[0] == "acs":
|
||||
assert len(cmd_path_list) >= 2
|
||||
if cmd_path_list[1] == "mgms":
|
||||
create_mgm_cmds(q, cmd_path_list)
|
||||
|
||||
+93
-93
@@ -1,93 +1,93 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
from spacepackets.ccsds.time import CdsShortTimestamp
|
||||
from spacepackets.ecss import PusTm
|
||||
from spacepackets.ecss.pus_17_test import Service17Tm
|
||||
from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams
|
||||
from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tmtc import GenericApidHandlerBase
|
||||
|
||||
from pytmtc.common import Apid, EventU32
|
||||
from pytmtc.hk import handle_hk_packet
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PusHandler(GenericApidHandlerBase):
|
||||
def __init__(
|
||||
self,
|
||||
file_logger: logging.Logger,
|
||||
verif_wrapper: VerificationWrapper,
|
||||
raw_logger: RawTmtcTimedLogWrapper,
|
||||
):
|
||||
super().__init__(None)
|
||||
self.file_logger = file_logger
|
||||
self.raw_logger = raw_logger
|
||||
self.verif_wrapper = verif_wrapper
|
||||
|
||||
def handle_tm(self, apid: int, packet: bytes, _user_args: Any):
|
||||
try:
|
||||
pus_tm = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
except ValueError as e:
|
||||
_LOGGER.warning("Could not generate PUS TM object from raw data")
|
||||
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
|
||||
raise e
|
||||
service = pus_tm.service
|
||||
if service == 1:
|
||||
tm_packet = Service1Tm.unpack(
|
||||
data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
|
||||
)
|
||||
res = self.verif_wrapper.add_tm(tm_packet)
|
||||
if res is None:
|
||||
_LOGGER.info(
|
||||
f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] "
|
||||
f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}"
|
||||
)
|
||||
_LOGGER.warning(
|
||||
f"No matching telecommand found for {tm_packet.tc_req_id}"
|
||||
)
|
||||
else:
|
||||
self.verif_wrapper.log_to_console(tm_packet, res)
|
||||
self.verif_wrapper.log_to_file(tm_packet, res)
|
||||
elif service == 3:
|
||||
pus_tm = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
handle_hk_packet(pus_tm)
|
||||
elif service == 5:
|
||||
tm_packet = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
src_data = tm_packet.source_data
|
||||
event_u32 = EventU32.unpack(src_data)
|
||||
_LOGGER.info(
|
||||
f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}"
|
||||
)
|
||||
if event_u32.group_id == 0 and event_u32.unique_id == 0:
|
||||
_LOGGER.info("Received test event")
|
||||
elif service == 17:
|
||||
tm_packet = Service17Tm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
if tm_packet.subservice == 2:
|
||||
self.file_logger.info("Received Ping Reply TM[17,2]")
|
||||
_LOGGER.info("Received Ping Reply TM[17,2]")
|
||||
else:
|
||||
self.file_logger.info(
|
||||
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
|
||||
)
|
||||
_LOGGER.info(
|
||||
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
|
||||
)
|
||||
else:
|
||||
_LOGGER.info(
|
||||
f"The service {service} is not implemented in Telemetry Factory"
|
||||
)
|
||||
tm_packet = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
self.raw_logger.log_tm(pus_tm)
|
||||
import logging
|
||||
from typing import Any
|
||||
from spacepackets.ccsds.time import CdsShortTimestamp
|
||||
from spacepackets.ecss import PusTm
|
||||
from spacepackets.ecss.pus_17_test import Service17Tm
|
||||
from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams
|
||||
from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tmtc import GenericApidHandlerBase
|
||||
|
||||
from pytmtc.common import Apid, EventU32
|
||||
from pytmtc.hk import handle_hk_packet
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PusHandler(GenericApidHandlerBase):
|
||||
def __init__(
|
||||
self,
|
||||
file_logger: logging.Logger,
|
||||
verif_wrapper: VerificationWrapper,
|
||||
raw_logger: RawTmtcTimedLogWrapper,
|
||||
):
|
||||
super().__init__(None)
|
||||
self.file_logger = file_logger
|
||||
self.raw_logger = raw_logger
|
||||
self.verif_wrapper = verif_wrapper
|
||||
|
||||
def handle_tm(self, apid: int, packet: bytes, _user_args: Any):
|
||||
try:
|
||||
pus_tm = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
except ValueError as e:
|
||||
_LOGGER.warning("Could not generate PUS TM object from raw data")
|
||||
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
|
||||
raise e
|
||||
service = pus_tm.service
|
||||
if service == 1:
|
||||
tm_packet = Service1Tm.unpack(
|
||||
data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
|
||||
)
|
||||
res = self.verif_wrapper.add_tm(tm_packet)
|
||||
if res is None:
|
||||
_LOGGER.info(
|
||||
f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] "
|
||||
f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}"
|
||||
)
|
||||
_LOGGER.warning(
|
||||
f"No matching telecommand found for {tm_packet.tc_req_id}"
|
||||
)
|
||||
else:
|
||||
self.verif_wrapper.log_to_console(tm_packet, res)
|
||||
self.verif_wrapper.log_to_file(tm_packet, res)
|
||||
elif service == 3:
|
||||
pus_tm = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
handle_hk_packet(pus_tm)
|
||||
elif service == 5:
|
||||
tm_packet = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
src_data = tm_packet.source_data
|
||||
event_u32 = EventU32.unpack(src_data)
|
||||
_LOGGER.info(
|
||||
f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}"
|
||||
)
|
||||
if event_u32.group_id == 0 and event_u32.unique_id == 0:
|
||||
_LOGGER.info("Received test event")
|
||||
elif service == 17:
|
||||
tm_packet = Service17Tm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
if tm_packet.subservice == 2:
|
||||
self.file_logger.info("Received Ping Reply TM[17,2]")
|
||||
_LOGGER.info("Received Ping Reply TM[17,2]")
|
||||
else:
|
||||
self.file_logger.info(
|
||||
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
|
||||
)
|
||||
_LOGGER.info(
|
||||
f"Received Test Packet with unknown subservice {tm_packet.subservice}"
|
||||
)
|
||||
else:
|
||||
_LOGGER.info(
|
||||
f"The service {service} is not implemented in Telemetry Factory"
|
||||
)
|
||||
tm_packet = PusTm.unpack(
|
||||
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
self.raw_logger.log_tm(pus_tm)
|
||||
|
||||
+3
-1
@@ -1 +1,3 @@
|
||||
.
|
||||
.
|
||||
spacepackets~=0.28.0
|
||||
tmtccmd~=8.2.0
|
||||
+48
-48
@@ -1,48 +1,48 @@
|
||||
from unittest import TestCase
|
||||
|
||||
from spacepackets.ccsds import CdsShortTimestamp
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper, QueueEntryHelper
|
||||
from tmtccmd.tmtc.queue import QueueWrapper
|
||||
|
||||
from pytmtc.config import SatrsConfigHook
|
||||
from pytmtc.pus_tc import pack_pus_telecommands
|
||||
|
||||
|
||||
class TestTcModules(TestCase):
|
||||
def setUp(self):
|
||||
self.hook = SatrsConfigHook(json_cfg_path="tmtc_conf.json")
|
||||
self.queue_helper = DefaultPusQueueHelper(
|
||||
queue_wrapper=QueueWrapper.empty(),
|
||||
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
|
||||
seq_cnt_provider=None,
|
||||
pus_verificator=None,
|
||||
default_pus_apid=None,
|
||||
)
|
||||
|
||||
def test_cmd_tree_creation_works_without_errors(self):
|
||||
cmd_defs = self.hook.get_command_definitions()
|
||||
self.assertIsNotNone(cmd_defs)
|
||||
|
||||
def test_ping_cmd_generation(self):
|
||||
pack_pus_telecommands(self.queue_helper, "/test/ping")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper = QueueEntryHelper(queue_entry)
|
||||
log_queue = entry_helper.to_log_entry()
|
||||
self.assertEqual(log_queue.log_str, "Sending PUS ping telecommand")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper.entry = queue_entry
|
||||
pus_tc_entry = entry_helper.to_pus_tc_entry()
|
||||
self.assertEqual(pus_tc_entry.pus_tc.service, 17)
|
||||
self.assertEqual(pus_tc_entry.pus_tc.subservice, 1)
|
||||
|
||||
def test_event_trigger_generation(self):
|
||||
pack_pus_telecommands(self.queue_helper, "/test/trigger_event")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper = QueueEntryHelper(queue_entry)
|
||||
log_queue = entry_helper.to_log_entry()
|
||||
self.assertEqual(log_queue.log_str, "Triggering test event")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper.entry = queue_entry
|
||||
pus_tc_entry = entry_helper.to_pus_tc_entry()
|
||||
self.assertEqual(pus_tc_entry.pus_tc.service, 17)
|
||||
self.assertEqual(pus_tc_entry.pus_tc.subservice, 128)
|
||||
from unittest import TestCase
|
||||
|
||||
from spacepackets.ccsds import CdsShortTimestamp
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper, QueueEntryHelper
|
||||
from tmtccmd.tmtc.queue import QueueWrapper
|
||||
|
||||
from pytmtc.config import SatrsConfigHook
|
||||
from pytmtc.pus_tc import pack_pus_telecommands
|
||||
|
||||
|
||||
class TestTcModules(TestCase):
|
||||
def setUp(self):
|
||||
self.hook = SatrsConfigHook(json_cfg_path="tmtc_conf.json")
|
||||
self.queue_helper = DefaultPusQueueHelper(
|
||||
queue_wrapper=QueueWrapper.empty(),
|
||||
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
|
||||
seq_cnt_provider=None,
|
||||
pus_verificator=None,
|
||||
default_pus_apid=None,
|
||||
)
|
||||
|
||||
def test_cmd_tree_creation_works_without_errors(self):
|
||||
cmd_defs = self.hook.get_command_definitions()
|
||||
self.assertIsNotNone(cmd_defs)
|
||||
|
||||
def test_ping_cmd_generation(self):
|
||||
pack_pus_telecommands(self.queue_helper, "/test/ping")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper = QueueEntryHelper(queue_entry)
|
||||
log_queue = entry_helper.to_log_entry()
|
||||
self.assertEqual(log_queue.log_str, "Sending PUS ping telecommand")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper.entry = queue_entry
|
||||
pus_tc_entry = entry_helper.to_pus_tc_entry()
|
||||
self.assertEqual(pus_tc_entry.pus_tc.service, 17)
|
||||
self.assertEqual(pus_tc_entry.pus_tc.subservice, 1)
|
||||
|
||||
def test_event_trigger_generation(self):
|
||||
pack_pus_telecommands(self.queue_helper, "/test/trigger_event")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper = QueueEntryHelper(queue_entry)
|
||||
log_queue = entry_helper.to_log_entry()
|
||||
self.assertEqual(log_queue.log_str, "Triggering test event")
|
||||
queue_entry = self.queue_helper.queue_wrapper.queue.popleft()
|
||||
entry_helper.entry = queue_entry
|
||||
pus_tc_entry = entry_helper.to_pus_tc_entry()
|
||||
self.assertEqual(pus_tc_entry.pus_tc.service, 17)
|
||||
self.assertEqual(pus_tc_entry.pus_tc.subservice, 128)
|
||||
|
||||
Reference in New Issue
Block a user