diff --git a/.gitignore b/.gitignore index 008bdd0..9b48007 100644 --- a/.gitignore +++ b/.gitignore @@ -1,143 +1,143 @@ -/tmtc_conf.json -__pycache__ - -/venv -/log -/.idea/* -!/.idea/runConfigurations - -/seqcnt.txt -/.tmtc-history.txt - -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# PyCharm -.idea +/tmtc_conf.json +__pycache__ + +/venv +/log +/.idea/* +!/.idea/runConfigurations + +/seqcnt.txt +/.tmtc-history.txt + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# PyCharm +.idea diff --git a/.snapshots/config.json b/.snapshots/config.json index dfadca2..0919b15 100644 --- a/.snapshots/config.json +++ b/.snapshots/config.json @@ -1,151 +1,151 @@ -{ - "excluded_patterns": [ - ".git", - ".gitignore", - "gradle", - "gradlew", - "gradlew.*", - "node_modules", - ".snapshots", - ".idea", - ".vscode", - "*.log", - "*.tmp", - "target", - "dist", - "build", - ".DS_Store", - "*.bak", - "*.swp", - "*.swo", - "*.lock", - "*.iml", - "coverage", - "*.min.js", - "*.min.css", - "__pycache__", - ".marketing", - ".env", - ".env.*", - "*.jpg", - "*.jpeg", - "*.png", - "*.gif", - "*.bmp", - "*.tiff", - "*.ico", - "*.svg", - "*.webp", - "*.psd", - "*.ai", - "*.eps", - "*.indd", - "*.raw", - "*.cr2", - "*.nef", - "*.mp4", - "*.mov", - "*.avi", - "*.wmv", - "*.flv", - "*.mkv", - "*.webm", - "*.m4v", - "*.wfp", - "*.prproj", - "*.aep", - "*.psb", - "*.xcf", - "*.sketch", - "*.fig", - "*.xd", - "*.db", - "*.sqlite", - "*.sqlite3", - "*.mdb", - "*.accdb", - "*.frm", - "*.myd", - "*.myi", - "*.ibd", - "*.dbf", - "*.rdb", - "*.aof", - "*.pdb", - "*.sdb", - "*.s3db", - "*.ddb", - "*.db-shm", - "*.db-wal", - "*.sqlitedb", - "*.sql.gz", - "*.bak.sql", - "dump.sql", - "dump.rdb", - "*.vsix", - "*.jar", - "*.war", - "*.ear", - "*.zip", - "*.tar", - "*.tar.gz", - "*.tgz", - "*.rar", - "*.7z", - "*.exe", - "*.dll", - "*.so", - "*.dylib", - "*.app", - "*.dmg", - "*.iso", - "*.msi", - "*.deb", - "*.rpm", - "*.apk", - "*.aab", - "*.ipa", - "*.pkg", - "*.nupkg", - "*.snap", - "*.whl", - "*.gem", - "*.pyc", - "*.pyo", - "*.pyd", - "*.class", - "*.o", - "*.obj", - "*.lib", - "*.a", - "*.map", - ".npmrc" - ], - "default": { - "default_prompt": "Enter your prompt here", - "default_include_all_files": false, - "default_include_entire_project_structure": true - }, - "included_patterns": [ - "build.gradle", - "settings.gradle", - "gradle.properties", - "pom.xml", - "Makefile", - "CMakeLists.txt", - "package.json", - "requirements.txt", - "Pipfile", - "Gemfile", - "composer.json", - ".editorconfig", - ".eslintrc.json", - ".eslintrc.js", - ".prettierrc", - ".babelrc", - ".dockerignore", - ".gitattributes", - ".stylelintrc", - ".npmrc" - ] +{ + "excluded_patterns": [ + ".git", + ".gitignore", + "gradle", + "gradlew", + "gradlew.*", + "node_modules", + ".snapshots", + ".idea", + ".vscode", + "*.log", + "*.tmp", + "target", + "dist", + "build", + ".DS_Store", + "*.bak", + "*.swp", + "*.swo", + "*.lock", + "*.iml", + "coverage", + "*.min.js", + "*.min.css", + "__pycache__", + ".marketing", + ".env", + ".env.*", + "*.jpg", + "*.jpeg", + "*.png", + "*.gif", + "*.bmp", + "*.tiff", + "*.ico", + "*.svg", + "*.webp", + "*.psd", + "*.ai", + "*.eps", + "*.indd", + "*.raw", + "*.cr2", + "*.nef", + "*.mp4", + "*.mov", + "*.avi", + "*.wmv", + "*.flv", + "*.mkv", + "*.webm", + "*.m4v", + "*.wfp", + "*.prproj", + "*.aep", + "*.psb", + "*.xcf", + "*.sketch", + "*.fig", + "*.xd", + "*.db", + "*.sqlite", + "*.sqlite3", + "*.mdb", + "*.accdb", + "*.frm", + "*.myd", + "*.myi", + "*.ibd", + "*.dbf", + "*.rdb", + "*.aof", + "*.pdb", + "*.sdb", + "*.s3db", + "*.ddb", + "*.db-shm", + "*.db-wal", + "*.sqlitedb", + "*.sql.gz", + "*.bak.sql", + "dump.sql", + "dump.rdb", + "*.vsix", + "*.jar", + "*.war", + "*.ear", + "*.zip", + "*.tar", + "*.tar.gz", + "*.tgz", + "*.rar", + "*.7z", + "*.exe", + "*.dll", + "*.so", + "*.dylib", + "*.app", + "*.dmg", + "*.iso", + "*.msi", + "*.deb", + "*.rpm", + "*.apk", + "*.aab", + "*.ipa", + "*.pkg", + "*.nupkg", + "*.snap", + "*.whl", + "*.gem", + "*.pyc", + "*.pyo", + "*.pyd", + "*.class", + "*.o", + "*.obj", + "*.lib", + "*.a", + "*.map", + ".npmrc" + ], + "default": { + "default_prompt": "Enter your prompt here", + "default_include_all_files": false, + "default_include_entire_project_structure": true + }, + "included_patterns": [ + "build.gradle", + "settings.gradle", + "gradle.properties", + "pom.xml", + "Makefile", + "CMakeLists.txt", + "package.json", + "requirements.txt", + "Pipfile", + "Gemfile", + "composer.json", + ".editorconfig", + ".eslintrc.json", + ".eslintrc.js", + ".prettierrc", + ".babelrc", + ".dockerignore", + ".gitattributes", + ".stylelintrc", + ".npmrc" + ] } \ No newline at end of file diff --git a/.snapshots/readme.md b/.snapshots/readme.md index 21fa917..aa9c5fd 100644 --- a/.snapshots/readme.md +++ b/.snapshots/readme.md @@ -1,11 +1,11 @@ -# Snapshots Directory - -This directory contains snapshots of your code for AI interactions. Each snapshot is a markdown file that includes relevant code context and project structure information. - -## What's included in snapshots? -- Selected code files and their contents -- Project structure (if enabled) -- Your prompt/question for the AI - -## Configuration -You can customize snapshot behavior in `config.json`. +# Snapshots Directory + +This directory contains snapshots of your code for AI interactions. Each snapshot is a markdown file that includes relevant code context and project structure information. + +## What's included in snapshots? +- Selected code files and their contents +- Project structure (if enabled) +- Your prompt/question for the AI + +## Configuration +You can customize snapshot behavior in `config.json`. diff --git a/.snapshots/sponsors.md b/.snapshots/sponsors.md index 2df337f..a0bdb42 100644 --- a/.snapshots/sponsors.md +++ b/.snapshots/sponsors.md @@ -1,44 +1,44 @@ -# Thank you for using Snapshots for AI - -Thanks for using Snapshots for AI. We hope this tool has helped you solve a problem or two. - -If you would like to support our work, please help us by considering the following offers and requests: - -## Ways to Support - -### Join the GBTI Network!!! 🙏🙏🙏 -The GBTI Network is a community of developers who are passionate about open source and community-driven development. Members enjoy access to exclussive tools, resources, a private MineCraft server, a listing in our members directory, co-op opportunities and more. - -- Support our work by becoming a [GBTI Network member](https://gbti.network/membership/). - -### Try out BugHerd 🐛 -BugHerd is a visual feedback and bug-tracking tool designed to streamline website development by enabling users to pin feedback directly onto web pages. This approach facilitates clear communication among clients, designers, developers, and project managers. - -- Start your free trial with [BugHerd](https://partners.bugherd.com/55z6c8az8rvr) today. - -### Hire Developers from Codeable 👥 -Codeable connects you with top-tier professionals skilled in frameworks and technologies such as Laravel, React, Django, Node, Vue.js, Angular, Ruby on Rails, and Node.js. Don't let the WordPress focus discourage you. Codeable experts do it all. - -- Visit [Codeable](https://www.codeable.io/developers/?ref=z8h3e) to hire your next team member. - -### Lead positive reviews on our marketplace listing ⭐⭐⭐⭐⭐ -- Rate us on [VSCode marketplace](https://marketplace.visualstudio.com/items?itemName=GBTI.snapshots-for-ai) -- Review us on [Cursor marketplace](https://open-vsx.org/extension/GBTI/snapshots-for-ai) - -### Star Our GitHub Repository ⭐ -- Star and watch our [repository](https://github.com/gbti-network/vscode-snapshots-for-ai) - -### 📡 Stay Connected -Follow us on your favorite platforms for updates, news, and community discussions: -- **[Twitter/X](https://twitter.com/gbti_network)** -- **[GitHub](https://github.com/gbti-network)** -- **[YouTube](https://www.youtube.com/channel/UCh4FjB6r4oWQW-QFiwqv-UA)** -- **[Dev.to](https://dev.to/gbti)** -- **[Daily.dev](https://dly.to/zfCriM6JfRF)** -- **[Hashnode](https://gbti.hashnode.dev/)** -- **[Discord Community](https://gbti.network)** -- **[Reddit Community](https://www.reddit.com/r/GBTI_network)** - ---- - -Thank you for supporting open source software! 🙏 +# Thank you for using Snapshots for AI + +Thanks for using Snapshots for AI. We hope this tool has helped you solve a problem or two. + +If you would like to support our work, please help us by considering the following offers and requests: + +## Ways to Support + +### Join the GBTI Network!!! 🙏🙏🙏 +The GBTI Network is a community of developers who are passionate about open source and community-driven development. Members enjoy access to exclussive tools, resources, a private MineCraft server, a listing in our members directory, co-op opportunities and more. + +- Support our work by becoming a [GBTI Network member](https://gbti.network/membership/). + +### Try out BugHerd 🐛 +BugHerd is a visual feedback and bug-tracking tool designed to streamline website development by enabling users to pin feedback directly onto web pages. This approach facilitates clear communication among clients, designers, developers, and project managers. + +- Start your free trial with [BugHerd](https://partners.bugherd.com/55z6c8az8rvr) today. + +### Hire Developers from Codeable 👥 +Codeable connects you with top-tier professionals skilled in frameworks and technologies such as Laravel, React, Django, Node, Vue.js, Angular, Ruby on Rails, and Node.js. Don't let the WordPress focus discourage you. Codeable experts do it all. + +- Visit [Codeable](https://www.codeable.io/developers/?ref=z8h3e) to hire your next team member. + +### Lead positive reviews on our marketplace listing ⭐⭐⭐⭐⭐ +- Rate us on [VSCode marketplace](https://marketplace.visualstudio.com/items?itemName=GBTI.snapshots-for-ai) +- Review us on [Cursor marketplace](https://open-vsx.org/extension/GBTI/snapshots-for-ai) + +### Star Our GitHub Repository ⭐ +- Star and watch our [repository](https://github.com/gbti-network/vscode-snapshots-for-ai) + +### 📡 Stay Connected +Follow us on your favorite platforms for updates, news, and community discussions: +- **[Twitter/X](https://twitter.com/gbti_network)** +- **[GitHub](https://github.com/gbti-network)** +- **[YouTube](https://www.youtube.com/channel/UCh4FjB6r4oWQW-QFiwqv-UA)** +- **[Dev.to](https://dev.to/gbti)** +- **[Daily.dev](https://dly.to/zfCriM6JfRF)** +- **[Hashnode](https://gbti.hashnode.dev/)** +- **[Discord Community](https://gbti.network)** +- **[Reddit Community](https://www.reddit.com/r/GBTI_network)** + +--- + +Thank you for supporting open source software! 🙏 diff --git a/README.md b/README.md new file mode 100644 index 0000000..f7d8cdc --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# TMTC Client +to interface the Host OBSW based on FSFW. + +commands assuming the use of linux: +## Creating a virtual environment + + > :warning: Note that you may have to install python-venv before you can create a virtual environment. For a debian based system that would be `sudo apt install python3-venv` + + > Note that when using the GUI in a virtualenv, you need to initialize the venv with system packages: + ```sh + # Creates the virtual environment venv in the folder venv + # Add the --system-site-packages flag only if you plan on using the GUI + python3 -m venv venv --system-site-packages + # Activate venv + source ./venv/bin/activate + ``` + +2. Install all dependencies + + +## install dependencies +### a: (recommended) installing them as classic python packets +```sh +pip install -r requirements.txt +``` +### b: (not recommended) installing them in editable mode instead +recommended if you actively develop the code in that repo. +```sh +pip install -e . +``` + +# Execute +To run the `tmtc` Command Line Interface (CLI) +```sh +(venv) python main.py +``` +In the program ++ 1 ++ select `localhost`, `port` of your choice e.g. `7301`. After the first run, a tmtc_config.json is present and stores the preset data. The OBSW tells you on which port it is listening when starting up. ![FSFW Port](port.png) ++ `:p` to print the command tree ++ to execute commands from the `tmtc CLI` you look into the device tree and execute the respective command split by slashes. +```sh +test/ping # example to execute the ping command (PUS 17,1) +``` +see below a screenshot to visualize the steps above. +![sending a PING](example_execution_ping.png) + + +In order for all of this to work, you must have your hosted OBSW running in another terminal listening to the correct port! \ No newline at end of file diff --git a/example_execution_ping.png b/example_execution_ping.png new file mode 100644 index 0000000..99aae72 Binary files /dev/null and b/example_execution_ping.png differ diff --git a/main.py b/main.py index 19da674..60575a9 100755 --- a/main.py +++ b/main.py @@ -1,103 +1,103 @@ -#!/usr/bin/env python3 -"""Example client for the sat-rs example application""" - -import logging -import sys -import time - -import tmtccmd -from spacepackets.ecss import PusVerificator - -from tmtccmd import ProcedureParamsWrapper -from tmtccmd.core.base import BackendRequest -from tmtccmd.pus import VerificationWrapper -from tmtccmd.tmtc import CcsdsTmHandler -from tmtccmd.config import ( - default_json_path, - SetupParams, - params_to_procedure_conversion, -) -from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper -from tmtccmd.logging import add_colorlog_console_logger -from tmtccmd.logging.pus import ( - RegularTmtcLogWrapper, - RawTmtcTimedLogWrapper, - TimedLogWhen, -) -from spacepackets.seqcount import PusFileSeqCountProvider - - -from pytmtc.config import SatrsConfigHook -from pytmtc.pus_tc import TcHandler -from pytmtc.pus_tm import PusHandler - -_LOGGER = logging.getLogger() - - -def main(): - add_colorlog_console_logger(_LOGGER) - tmtccmd.init_printout(False) - hook_obj = SatrsConfigHook(json_cfg_path=default_json_path()) - parser_wrapper = PreArgsParsingWrapper() - parser_wrapper.create_default_parent_parser() - parser_wrapper.create_default_parser() - parser_wrapper.add_def_proc_args() - params = SetupParams() - post_args_wrapper = parser_wrapper.parse(hook_obj, params) - proc_wrapper = ProcedureParamsWrapper() - if post_args_wrapper.use_gui: - post_args_wrapper.set_params_without_prompts(proc_wrapper) - else: - post_args_wrapper.set_params_with_prompts(proc_wrapper) - setup_args = SetupWrapper( - hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper - ) - # Create console logger helper and file loggers - tmtc_logger = RegularTmtcLogWrapper() - file_logger = tmtc_logger.logger - raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1) - verificator = PusVerificator() - verification_wrapper = VerificationWrapper(verificator, _LOGGER, file_logger) - # Create primary TM handler and add it to the CCSDS Packet Handler - tm_handler = PusHandler(file_logger, verification_wrapper, raw_logger) - ccsds_handler = CcsdsTmHandler(generic_handler=tm_handler) - # TODO: We could add the CFDP handler for the CFDP APID at a later stage. - # ccsds_handler.add_apid_handler(tm_handler) - - # Create TC handler - seq_count_provider = PusFileSeqCountProvider() - tc_handler = TcHandler(seq_count_provider, verification_wrapper) - tmtccmd.setup(setup_args=setup_args) - init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper) - tmtc_backend = tmtccmd.create_default_tmtc_backend( - setup_wrapper=setup_args, - tm_handler=ccsds_handler, - tc_handler=tc_handler, - init_procedure=init_proc, - ) - tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj) - try: - while True: - state = tmtc_backend.periodic_op(None) - if state.request == BackendRequest.TERMINATION_NO_ERROR: - tmtc_backend.close_com_if() - sys.exit(0) - elif state.request == BackendRequest.DELAY_IDLE: - _LOGGER.info("TMTC Client in IDLE mode") - time.sleep(3.0) - elif state.request == BackendRequest.DELAY_LISTENER: - time.sleep(0.8) - elif state.request == BackendRequest.DELAY_CUSTOM: - if state.next_delay.total_seconds() <= 0.4: - time.sleep(state.next_delay.total_seconds()) - else: - time.sleep(0.4) - elif state.request == BackendRequest.CALL_NEXT: - pass - except KeyboardInterrupt: - tmtc_backend.close_com_if() - sys.exit(0) - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +"""Example client for the sat-rs example application""" + +import logging +import sys +import time + +import tmtccmd +from spacepackets.ecss import PusVerificator + +from tmtccmd import ProcedureParamsWrapper +from tmtccmd.core.base import BackendRequest +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tmtc import CcsdsTmHandler +from tmtccmd.config import ( + default_json_path, + SetupParams, + params_to_procedure_conversion, +) +from tmtccmd.config import PreArgsParsingWrapper, SetupWrapper +from tmtccmd.logging import add_colorlog_console_logger +from tmtccmd.logging.pus import ( + RegularTmtcLogWrapper, + RawTmtcTimedLogWrapper, + TimedLogWhen, +) +from spacepackets.seqcount import PusFileSeqCountProvider + + +from pytmtc.config import SatrsConfigHook +from pytmtc.pus_tc import TcHandler +from pytmtc.pus_tm import PusHandler + +_LOGGER = logging.getLogger() + + +def main(): + add_colorlog_console_logger(_LOGGER) + tmtccmd.init_printout(False) + hook_obj = SatrsConfigHook(json_cfg_path=default_json_path()) # this is our entry point + parser_wrapper = PreArgsParsingWrapper() + parser_wrapper.create_default_parent_parser() + parser_wrapper.create_default_parser() + parser_wrapper.add_def_proc_args() + params = SetupParams() + post_args_wrapper = parser_wrapper.parse(hook_obj, params) + proc_wrapper = ProcedureParamsWrapper() + if post_args_wrapper.use_gui: + post_args_wrapper.set_params_without_prompts(proc_wrapper) + else: + post_args_wrapper.set_params_with_prompts(proc_wrapper) + setup_args = SetupWrapper( + hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_wrapper + ) + # Create console logger helper and file loggers + tmtc_logger = RegularTmtcLogWrapper() + file_logger = tmtc_logger.logger + raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=1) + verificator = PusVerificator() + verification_wrapper = VerificationWrapper(verificator, _LOGGER, file_logger) + # Create primary TM handler and add it to the CCSDS Packet Handler + tm_handler = PusHandler(file_logger, verification_wrapper, raw_logger) + ccsds_handler = CcsdsTmHandler(generic_handler=tm_handler) + # TODO: We could add the CFDP handler for the CFDP APID at a later stage. + # ccsds_handler.add_apid_handler(tm_handler) + + # Create TC handler + seq_count_provider = PusFileSeqCountProvider() + tc_handler = TcHandler(seq_count_provider, verification_wrapper) + tmtccmd.setup(setup_args=setup_args) + init_proc = params_to_procedure_conversion(setup_args.proc_param_wrapper) + tmtc_backend = tmtccmd.create_default_tmtc_backend( + setup_wrapper=setup_args, + tm_handler=ccsds_handler, + tc_handler=tc_handler, + init_procedure=init_proc, + ) + tmtccmd.start(tmtc_backend=tmtc_backend, hook_obj=hook_obj) + try: + while True: + state = tmtc_backend.periodic_op(None) + if state.request == BackendRequest.TERMINATION_NO_ERROR: + tmtc_backend.close_com_if() + sys.exit(0) + elif state.request == BackendRequest.DELAY_IDLE: + _LOGGER.info("TMTC Client in IDLE mode") + time.sleep(3.0) + elif state.request == BackendRequest.DELAY_LISTENER: + time.sleep(0.8) + elif state.request == BackendRequest.DELAY_CUSTOM: + if state.next_delay.total_seconds() <= 0.4: + time.sleep(state.next_delay.total_seconds()) + else: + time.sleep(0.4) + elif state.request == BackendRequest.CALL_NEXT: + pass + except KeyboardInterrupt: + tmtc_backend.close_com_if() + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/port.png b/port.png new file mode 100644 index 0000000..1273ab4 Binary files /dev/null and b/port.png differ diff --git a/pyproject.toml b/pyproject.toml index bcbb0ab..460c22e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,27 +1,28 @@ -[build-system] -requires = ["setuptools>=61.0"] -build-backend = "setuptools.build_meta" - -[project] -name = "pytmtc" -description = "Python TMTC client for OPS-SAT" -readme = "README.md" -version = "0.1.0" -requires-python = ">=3.8" -authors = [ - {name = "Robin Mueller", email = "robin.mueller.m@gmail.com"}, -] -dependencies = [ - "tmtccmd~=8.1", - "pydantic~=2.7" -] - -[tool.setuptools.packages] -find = {} - -[tool.ruff] -extend-exclude = ["archive"] -[tool.ruff.lint] -ignore = ["E501"] -[tool.ruff.lint.extend-per-file-ignores] -"__init__.py" = ["F401"] +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "pytmtc" +description = "Python TMTC client for Host Build" +readme = "README.md" +version = "0.1.0" +requires-python = ">=3.8" +authors = [ + {name = "Robin Mueller", email = "robin.mueller.m@gmail.com"}, +] +dependencies = [ + "tmtccmd~=8.2", + "spacepackets~=0.28.0", + "pydantic~=2.7" +] + +[tool.setuptools.packages] +find = {} + +[tool.ruff] +extend-exclude = ["archive"] +[tool.ruff.lint] +ignore = ["E501"] +[tool.ruff.lint.extend-per-file-ignores] +"__init__.py" = ["F401"] diff --git a/pytmtc/acs/__init__.py b/pytmtc/acs/__init__.py index 834654d..9bfd44f 100644 --- a/pytmtc/acs/__init__.py +++ b/pytmtc/acs/__init__.py @@ -1,11 +1,11 @@ -from tmtccmd.config import CmdTreeNode - - -def create_acs_node(mode_node: CmdTreeNode, hk_node: CmdTreeNode) -> CmdTreeNode: - acs_node = CmdTreeNode("acs", "ACS Subsystem Node") - mgm_node = CmdTreeNode("mgms", "MGM devices node") - mgm_node.add_child(mode_node) - mgm_node.add_child(hk_node) - - acs_node.add_child(mgm_node) - return acs_node +from tmtccmd.config import CmdTreeNode + + +def create_acs_node(mode_node: CmdTreeNode, hk_node: CmdTreeNode) -> CmdTreeNode: + acs_node = CmdTreeNode("acs", "ACS Subsystem Node") + mgm_node = CmdTreeNode("mgms", "MGM devices node") + mgm_node.add_child(mode_node) + mgm_node.add_child(hk_node) + + acs_node.add_child(mgm_node) + return acs_node diff --git a/pytmtc/acs/mgms.py b/pytmtc/acs/mgms.py index d420b3e..dd88685 100644 --- a/pytmtc/acs/mgms.py +++ b/pytmtc/acs/mgms.py @@ -1,45 +1,45 @@ -import logging -import struct -import enum -from typing import List -from spacepackets.ecss import PusTm -from tmtccmd.tmtc import DefaultPusQueueHelper - -from pytmtc.common import AcsId, Apid -from pytmtc.hk_common import create_request_one_shot_hk_cmd -from pytmtc.mode import handle_set_mode_cmd - - -_LOGGER = logging.getLogger(__name__) - - -class SetId(enum.IntEnum): - SENSOR_SET = 0 - - -def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]): - assert len(cmd_path) >= 3 - if cmd_path[2] == "hk": - if cmd_path[3] == "one_shot_hk": - q.add_log_cmd("Sending HK one shot request") - q.add_pus_tc( - create_request_one_shot_hk_cmd(Apid.ACS, AcsId.MGM_0, SetId.SENSOR_SET) - ) - - if cmd_path[2] == "mode": - if cmd_path[3] == "set_mode": - handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0) - - -def handle_mgm_hk_report(pus_tm: PusTm, set_id: int, hk_data: bytes): - if set_id == SetId.SENSOR_SET: - if len(hk_data) != 13: - raise ValueError(f"invalid HK data length, expected 13, got {len(hk_data)}") - data_valid = hk_data[0] - mgm_x = struct.unpack("!f", hk_data[1:5])[0] - mgm_y = struct.unpack("!f", hk_data[5:9])[0] - mgm_z = struct.unpack("!f", hk_data[9:13])[0] - _LOGGER.info( - f"received MGM HK set in uT: Valid {data_valid} X {mgm_x} Y {mgm_y} Z {mgm_z}" - ) - pass +import logging +import struct +import enum +from typing import List +from spacepackets.ecss import PusTm +from tmtccmd.tmtc import DefaultPusQueueHelper + +from pytmtc.common import AcsId, Apid +from pytmtc.hk_common import create_request_one_shot_hk_cmd +from pytmtc.mode import handle_set_mode_cmd + + +_LOGGER = logging.getLogger(__name__) + + +class SetId(enum.IntEnum): + SENSOR_SET = 0 + + +def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]): + assert len(cmd_path) >= 3 + if cmd_path[2] == "hk": + if cmd_path[3] == "one_shot_hk": + q.add_log_cmd("Sending HK one shot request") + q.add_pus_tc( + create_request_one_shot_hk_cmd(Apid.ACS, AcsId.MGM_0, SetId.SENSOR_SET) + ) + + if cmd_path[2] == "mode": + if cmd_path[3] == "set_mode": + handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0) + + +def handle_mgm_hk_report(pus_tm: PusTm, set_id: int, hk_data: bytes): + if set_id == SetId.SENSOR_SET: + if len(hk_data) != 13: + raise ValueError(f"invalid HK data length, expected 13, got {len(hk_data)}") + data_valid = hk_data[0] + mgm_x = struct.unpack("!f", hk_data[1:5])[0] + mgm_y = struct.unpack("!f", hk_data[5:9])[0] + mgm_z = struct.unpack("!f", hk_data[9:13])[0] + _LOGGER.info( + f"received MGM HK set in uT: Valid {data_valid} X {mgm_x} Y {mgm_y} Z {mgm_z}" + ) + pass diff --git a/pytmtc/common.py b/pytmtc/common.py index e5a7380..ce0e601 100644 --- a/pytmtc/common.py +++ b/pytmtc/common.py @@ -1,55 +1,55 @@ -from __future__ import annotations - -import dataclasses -import enum -import struct - - -class Apid(enum.IntEnum): - SCHED = 1 - GENERIC_PUS = 2 - ACS = 3 - CFDP = 4 - TMTC = 5 - - -class EventSeverity(enum.IntEnum): - INFO = 0 - LOW = 1 - MEDIUM = 2 - HIGH = 3 - - -@dataclasses.dataclass -class EventU32: - severity: EventSeverity - group_id: int - unique_id: int - - @classmethod - def unpack(cls, data: bytes) -> EventU32: - if len(data) < 4: - raise ValueError("passed data too short") - event_raw = struct.unpack("!I", data[0:4])[0] - return cls( - severity=EventSeverity((event_raw >> 30) & 0b11), - group_id=(event_raw >> 16) & 0x3FFF, - unique_id=event_raw & 0xFFFF, - ) - - -class AcsId(enum.IntEnum): - SUBSYSTEM = 1 - MGM_ASSEMBLY = 2 - MGM_0 = 3 - MGM_1 = 4 - - -class AcsHkIds(enum.IntEnum): - MGM_SET = 1 - - -def make_addressable_id(target_id: int, unique_id: int) -> bytes: - byte_string = bytearray(struct.pack("!I", target_id)) - byte_string.extend(struct.pack("!I", unique_id)) - return byte_string +from __future__ import annotations + +import dataclasses +import enum +import struct + + +class Apid(enum.IntEnum): + SCHED = 1 + GENERIC_PUS = 0xEF # Test Service 17 + ACS = 3 + CFDP = 4 + TMTC = 5 + + +class EventSeverity(enum.IntEnum): + INFO = 0 + LOW = 1 + MEDIUM = 2 + HIGH = 3 + + +@dataclasses.dataclass +class EventU32: + severity: EventSeverity + group_id: int + unique_id: int + + @classmethod + def unpack(cls, data: bytes) -> EventU32: + if len(data) < 4: + raise ValueError("passed data too short") + event_raw = struct.unpack("!I", data[0:4])[0] + return cls( + severity=EventSeverity((event_raw >> 30) & 0b11), + group_id=(event_raw >> 16) & 0x3FFF, + unique_id=event_raw & 0xFFFF, + ) + + +class AcsId(enum.IntEnum): + SUBSYSTEM = 1 + MGM_ASSEMBLY = 2 + MGM_0 = 3 + MGM_1 = 4 + + +class AcsHkIds(enum.IntEnum): + MGM_SET = 1 + + +def make_addressable_id(target_id: int, unique_id: int) -> bytes: + byte_string = bytearray(struct.pack("!I", target_id)) + byte_string.extend(struct.pack("!I", unique_id)) + return byte_string diff --git a/pytmtc/config.py b/pytmtc/config.py index 150b9c1..3425d44 100644 --- a/pytmtc/config.py +++ b/pytmtc/config.py @@ -1,47 +1,47 @@ -from typing import Optional -from prompt_toolkit.history import FileHistory, History -from spacepackets.ccsds import PacketId, PacketType -from tmtccmd import HookBase -from tmtccmd.com import ComInterface -from tmtccmd.config import CmdTreeNode -from tmtccmd.util.obj_id import ObjectIdDictT - -from pytmtc.common import Apid -from pytmtc.pus_tc import create_cmd_definition_tree - - -class SatrsConfigHook(HookBase): - def __init__(self, json_cfg_path: str): - super().__init__(json_cfg_path) - - def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: - from tmtccmd.config.com import ( - create_com_interface_default, - create_com_interface_cfg_default, - ) - - assert self.cfg_path is not None - packet_id_list = [] - for apid in Apid: - packet_id_list.append(PacketId(PacketType.TM, True, apid)) - cfg = create_com_interface_cfg_default( - com_if_key=com_if_key, - json_cfg_path=self.cfg_path, - space_packet_ids=packet_id_list, - ) - assert cfg is not None - return create_com_interface_default(cfg) - - def get_command_definitions(self) -> CmdTreeNode: - """This function should return the root node of the command definition tree.""" - return create_cmd_definition_tree() - - def get_cmd_history(self) -> Optional[History]: - """Optionlly return a history class for the past command paths which will be used - when prompting a command path from the user in CLI mode.""" - return FileHistory(".tmtc-history.txt") - - def get_object_ids(self) -> ObjectIdDictT: - from tmtccmd.config.objects import get_core_object_ids - - return get_core_object_ids() +from typing import Optional +from prompt_toolkit.history import FileHistory, History +from spacepackets.ccsds import PacketId, PacketType +from tmtccmd import HookBase +from tmtccmd.com import ComInterface +from tmtccmd.config import CmdTreeNode +from tmtccmd.util.obj_id import ObjectIdDictT + +from pytmtc.common import Apid +from pytmtc.pus_tc import create_cmd_definition_tree + + +class SatrsConfigHook(HookBase): + def __init__(self, json_cfg_path: str): + super().__init__(json_cfg_path) + + def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]: + from tmtccmd.config.com import ( + create_com_interface_default, + create_com_interface_cfg_default, + ) + + assert self.cfg_path is not None + packet_id_list = [] + for apid in Apid: + packet_id_list.append(PacketId(PacketType.TM, True, apid)) + cfg = create_com_interface_cfg_default( + com_if_key=com_if_key, + json_cfg_path=self.cfg_path, + space_packet_ids=packet_id_list, + ) + assert cfg is not None + return create_com_interface_default(cfg) + + def get_command_definitions(self) -> CmdTreeNode: + """This function should return the root node of the command definition tree.""" + return create_cmd_definition_tree() # this is the entry point for your TC definition + + def get_cmd_history(self) -> Optional[History]: + """Optionlly return a history class for the past command paths which will be used + when prompting a command path from the user in CLI mode.""" + return FileHistory(".tmtc-history.txt") + + def get_object_ids(self) -> ObjectIdDictT: + from tmtccmd.config.objects import get_core_object_ids + + return get_core_object_ids() diff --git a/pytmtc/hk.py b/pytmtc/hk.py index d50f0f1..6f52301 100644 --- a/pytmtc/hk.py +++ b/pytmtc/hk.py @@ -1,42 +1,42 @@ -import logging -import struct -from spacepackets.ecss.pus_3_hk import Subservice -from spacepackets.ecss import PusTm - -from pytmtc.common import AcsId, Apid -from pytmtc.acs.mgms import handle_mgm_hk_report - - -_LOGGER = logging.getLogger(__name__) - - -def handle_hk_packet(pus_tm: PusTm): - if len(pus_tm.source_data) < 4: - raise ValueError("no unique ID in HK packet") - unique_id = struct.unpack("!I", pus_tm.source_data[:4])[0] - if ( - pus_tm.subservice == Subservice.TM_HK_REPORT - or pus_tm.subservice == Subservice.TM_DIAGNOSTICS_REPORT - ): - if len(pus_tm.source_data) < 8: - raise ValueError("no set ID in HK packet") - set_id = struct.unpack("!I", pus_tm.source_data[4:8])[0] - handle_hk_report(pus_tm, unique_id, set_id) - _LOGGER.warning( - f"handling for HK packet with subservice {pus_tm.subservice} not implemented yet" - ) - - -def handle_hk_report(pus_tm: PusTm, unique_id: int, set_id: int): - hk_data = pus_tm.source_data[8:] - if pus_tm.apid == Apid.ACS: - if unique_id == AcsId.MGM_0: - handle_mgm_hk_report(pus_tm, set_id, hk_data) - else: - _LOGGER.warning( - f"handling for HK report with unique ID {unique_id} not implemented yet" - ) - else: - _LOGGER.warning( - f"handling for HK report with apid {pus_tm.apid} not implemented yet" - ) +import logging +import struct +from spacepackets.ecss.pus_3_hk import Subservice +from spacepackets.ecss import PusTm + +from pytmtc.common import AcsId, Apid +from pytmtc.acs.mgms import handle_mgm_hk_report + + +_LOGGER = logging.getLogger(__name__) + + +def handle_hk_packet(pus_tm: PusTm): + if len(pus_tm.source_data) < 4: + raise ValueError("no unique ID in HK packet") + unique_id = struct.unpack("!I", pus_tm.source_data[:4])[0] + if ( + pus_tm.subservice == Subservice.TM_HK_REPORT + or pus_tm.subservice == Subservice.TM_DIAGNOSTICS_REPORT + ): + if len(pus_tm.source_data) < 8: + raise ValueError("no set ID in HK packet") + set_id = struct.unpack("!I", pus_tm.source_data[4:8])[0] + handle_hk_report(pus_tm, unique_id, set_id) + _LOGGER.warning( + f"handling for HK packet with subservice {pus_tm.subservice} not implemented yet" + ) + + +def handle_hk_report(pus_tm: PusTm, unique_id: int, set_id: int): + hk_data = pus_tm.source_data[8:] + if pus_tm.apid == Apid.ACS: + if unique_id == AcsId.MGM_0: + handle_mgm_hk_report(pus_tm, set_id, hk_data) + else: + _LOGGER.warning( + f"handling for HK report with unique ID {unique_id} not implemented yet" + ) + else: + _LOGGER.warning( + f"handling for HK report with apid {pus_tm.apid} not implemented yet" + ) diff --git a/pytmtc/hk_common.py b/pytmtc/hk_common.py index bb9890a..9023d9e 100644 --- a/pytmtc/hk_common.py +++ b/pytmtc/hk_common.py @@ -1,16 +1,16 @@ -import struct - -from spacepackets.ecss import PusService, PusTc -from spacepackets.ecss.pus_3_hk import Subservice - - -def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc: - app_data = bytearray() - app_data.extend(struct.pack("!I", unique_id)) - app_data.extend(struct.pack("!I", set_id)) - return PusTc( - service=PusService.S3_HOUSEKEEPING, - subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT, - apid=apid, - app_data=app_data, - ) +import struct + +from spacepackets.ecss import PusService, PusTc +from spacepackets.ecss.pus_3_hk import Subservice + + +def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc: + app_data = bytearray() + app_data.extend(struct.pack("!I", unique_id)) + app_data.extend(struct.pack("!I", set_id)) + return PusTc( + service=PusService.S3_HOUSEKEEPING, + subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT, + apid=apid, + app_data=app_data, + ) diff --git a/pytmtc/mode.py b/pytmtc/mode.py index 918fdb1..3c66309 100644 --- a/pytmtc/mode.py +++ b/pytmtc/mode.py @@ -1,31 +1,31 @@ -import struct -from spacepackets.ecss import PusTc -from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice -from tmtccmd.tmtc import DefaultPusQueueHelper - - -def create_set_mode_cmd(apid: int, unique_id: int, mode: int, submode: int) -> PusTc: - app_data = bytearray() - app_data.extend(struct.pack("!I", unique_id)) - app_data.extend(struct.pack("!I", mode)) - app_data.extend(struct.pack("!H", submode)) - return PusTc( - service=200, - subservice=Subservice.TC_MODE_COMMAND, - apid=apid, - app_data=app_data, - ) - - -def handle_set_mode_cmd( - q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int -): - if mode_str == "off": - q.add_log_cmd(f"Sending Mode OFF to {target_str}") - q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0)) - elif mode_str == "on": - q.add_log_cmd(f"Sending Mode ON to {target_str}") - q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0)) - elif mode_str == "normal": - q.add_log_cmd(f"Sending Mode NORMAL to {target_str}") - q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0)) +import struct +from spacepackets.ecss import PusTc +from tmtccmd.pus.s200_fsfw_mode import Mode, Subservice +from tmtccmd.tmtc import DefaultPusQueueHelper + + +def create_set_mode_cmd(apid: int, unique_id: int, mode: int, submode: int) -> PusTc: + app_data = bytearray() + app_data.extend(struct.pack("!I", unique_id)) + app_data.extend(struct.pack("!I", mode)) + app_data.extend(struct.pack("!H", submode)) + return PusTc( + service=200, + subservice=Subservice.TC_MODE_COMMAND, + apid=apid, + app_data=app_data, + ) + + +def handle_set_mode_cmd( + q: DefaultPusQueueHelper, target_str: str, mode_str: str, apid: int, unique_id: int +): + if mode_str == "off": + q.add_log_cmd(f"Sending Mode OFF to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.OFF, 0)) + elif mode_str == "on": + q.add_log_cmd(f"Sending Mode ON to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.ON, 0)) + elif mode_str == "normal": + q.add_log_cmd(f"Sending Mode NORMAL to {target_str}") + q.add_pus_tc(create_set_mode_cmd(apid, unique_id, Mode.NORMAL, 0)) diff --git a/pytmtc/pus_tc.py b/pytmtc/pus_tc.py index f1fe5c5..29d2058 100644 --- a/pytmtc/pus_tc.py +++ b/pytmtc/pus_tc.py @@ -1,143 +1,154 @@ -import datetime -import logging - -from spacepackets.ccsds import CdsShortTimestamp -from spacepackets.ecss import PusTelecommand -from spacepackets.seqcount import FileSeqCountProvider -from tmtccmd import ProcedureWrapper, TcHandlerBase -from tmtccmd.config import CmdTreeNode -from tmtccmd.pus import VerificationWrapper -from tmtccmd.tmtc import ( - DefaultPusQueueHelper, - FeedWrapper, - QueueWrapper, - SendCbParams, - TcProcedureType, - TcQueueEntryType, -) -from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd - -from pytmtc.acs import create_acs_node -from pytmtc.common import Apid -from pytmtc.acs.mgms import create_mgm_cmds - -_LOGGER = logging.getLogger(__name__) - - -class TcHandler(TcHandlerBase): - def __init__( - self, - seq_count_provider: FileSeqCountProvider, - verif_wrapper: VerificationWrapper, - ): - super(TcHandler, self).__init__() - self.seq_count_provider = seq_count_provider - self.verif_wrapper = verif_wrapper - self.queue_helper = DefaultPusQueueHelper( - queue_wrapper=QueueWrapper.empty(), - tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, - seq_cnt_provider=seq_count_provider, - pus_verificator=self.verif_wrapper.pus_verificator, - default_pus_apid=None, - ) - - def send_cb(self, send_params: SendCbParams): - entry_helper = send_params.entry - if entry_helper.is_tc: - if entry_helper.entry_type == TcQueueEntryType.PUS_TC: - pus_tc_wrapper = entry_helper.to_pus_tc_entry() - raw_tc = pus_tc_wrapper.pus_tc.pack() - _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") - send_params.com_if.send(raw_tc) - elif entry_helper.entry_type == TcQueueEntryType.LOG: - log_entry = entry_helper.to_log_entry() - _LOGGER.info(log_entry.log_str) - - def queue_finished_cb(self, info: ProcedureWrapper): - if info.proc_type == TcProcedureType.TREE_COMMANDING: - def_proc = info.to_tree_commanding_procedure() - _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") - - def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): - q = self.queue_helper - q.queue_wrapper = wrapper.queue_wrapper - if info.proc_type == TcProcedureType.TREE_COMMANDING: - def_proc = info.to_tree_commanding_procedure() - assert def_proc.cmd_path is not None - pack_pus_telecommands(q, def_proc.cmd_path) - - -def create_cmd_definition_tree() -> CmdTreeNode: - root_node = CmdTreeNode.root_node() - - hk_node = CmdTreeNode("hk", "Housekeeping Node", hide_children_for_print=True) - hk_node.add_child(CmdTreeNode("one_shot_hk", "Request One Shot HK set")) - hk_node.add_child( - CmdTreeNode("enable", "Enable periodic housekeeping data generation") - ) - hk_node.add_child( - CmdTreeNode("disable", "Disable periodic housekeeping data generation") - ) - - mode_node = CmdTreeNode("mode", "Mode Node", hide_children_for_print=True) - set_mode_node = CmdTreeNode( - "set_mode", "Set Node", hide_children_which_are_leaves=True - ) - set_mode_node.add_child(CmdTreeNode("off", "Set OFF Mode")) - set_mode_node.add_child(CmdTreeNode("on", "Set ON Mode")) - set_mode_node.add_child(CmdTreeNode("normal", "Set NORMAL Mode")) - mode_node.add_child(set_mode_node) - mode_node.add_child(CmdTreeNode("read_mode", "Read Mode")) - - test_node = CmdTreeNode("test", "Test Node") - test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC")) - test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event")) - root_node.add_child(test_node) - - scheduler_node = CmdTreeNode("scheduler", "Scheduler Node") - scheduler_node.add_child( - CmdTreeNode( - "schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds" - ) - ) - root_node.add_child(scheduler_node) - root_node.add_child(create_acs_node(mode_node, hk_node)) - return root_node - - -def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): - # It should always be at least the root path "/", so we split of the empty portion left of it. - cmd_path_list = cmd_path.split("/")[1:] - if len(cmd_path_list) == 0: - _LOGGER.warning("empty command path") - return - if cmd_path_list[0] == "test": - assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "ping": - q.add_log_cmd("Sending PUS ping telecommand") - return q.add_pus_tc( - PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1) - ) - elif cmd_path_list[1] == "trigger_event": - q.add_log_cmd("Triggering test event") - return q.add_pus_tc( - PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128) - ) - if cmd_path_list[0] == "scheduler": - assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "schedule_ping_10_secs_ahead": - q.add_log_cmd("Sending PUS scheduled TC telecommand") - crt_time = CdsShortTimestamp.from_now() - time_stamp = crt_time + datetime.timedelta(seconds=10) - time_stamp = time_stamp.pack() - return q.add_pus_tc( - create_time_tagged_cmd( - time_stamp, - PusTelecommand(service=17, subservice=1), - apid=Apid.SCHED, - ) - ) - if cmd_path_list[0] == "acs": - assert len(cmd_path_list) >= 2 - if cmd_path_list[1] == "mgms": - create_mgm_cmds(q, cmd_path_list) +import datetime +import logging + +from spacepackets.ccsds import CdsShortTimestamp +from spacepackets.ecss import PusTelecommand +from spacepackets.seqcount import FileSeqCountProvider +from tmtccmd import ProcedureWrapper, TcHandlerBase +from tmtccmd.config import CmdTreeNode +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tmtc import ( + DefaultPusQueueHelper, + FeedWrapper, + QueueWrapper, + SendCbParams, + TcProcedureType, + TcQueueEntryType, +) +from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd + +from pytmtc.acs import create_acs_node +from pytmtc.common import Apid +from pytmtc.acs.mgms import create_mgm_cmds + +_LOGGER = logging.getLogger(__name__) + + +class TcHandler(TcHandlerBase): + def __init__( + self, + seq_count_provider: FileSeqCountProvider, + verif_wrapper: VerificationWrapper, + ): + super(TcHandler, self).__init__() + self.seq_count_provider = seq_count_provider + self.verif_wrapper = verif_wrapper + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=QueueWrapper.empty(), + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, + seq_cnt_provider=seq_count_provider, + pus_verificator=self.verif_wrapper.pus_verificator, + default_pus_apid=None, + ) + + def send_cb(self, send_params: SendCbParams): + entry_helper = send_params.entry + if entry_helper.is_tc: + if entry_helper.entry_type == TcQueueEntryType.PUS_TC: + pus_tc_wrapper = entry_helper.to_pus_tc_entry() + raw_tc = pus_tc_wrapper.pus_tc.pack() + _LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}") + send_params.com_if.send(raw_tc) + elif entry_helper.entry_type == TcQueueEntryType.LOG: + log_entry = entry_helper.to_log_entry() + _LOGGER.info(log_entry.log_str) + + def queue_finished_cb(self, info: ProcedureWrapper): + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() + _LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}") + + def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): + q = self.queue_helper + q.queue_wrapper = wrapper.queue_wrapper + if info.proc_type == TcProcedureType.TREE_COMMANDING: + def_proc = info.to_tree_commanding_procedure() + assert def_proc.cmd_path is not None + pack_pus_telecommands(q, def_proc.cmd_path) + + +def create_cmd_definition_tree() -> CmdTreeNode: + root_node = CmdTreeNode.root_node() + + # dummy stuff for testing + test_node = CmdTreeNode("test", "Test Node") + test_node.add_child(CmdTreeNode("ping", "Send PUS ping TC")) + test_node.add_child(CmdTreeNode("trigger_event", "Send PUS test to trigger event")) + root_node.add_child(test_node) + + # here we can see the different interfaces + # housekeeping (PUS 3) + hk_node = CmdTreeNode("hk", "Housekeeping Node", hide_children_for_print=True) + hk_node.add_child(CmdTreeNode("one_shot_hk", "Request One Shot HK set")) + hk_node.add_child( + CmdTreeNode("enable", "Enable periodic housekeeping data generation") + ) + hk_node.add_child( + CmdTreeNode("disable", "Disable periodic housekeeping data generation") + ) + + # mode (PUS 200) + mode_node = CmdTreeNode("mode", "Mode Node", hide_children_for_print=True) + # create the command to set any mode + set_mode_node = CmdTreeNode( + "set_mode", "Set Node", hide_children_which_are_leaves=True + ) + # adding the different modes + set_mode_node.add_child(CmdTreeNode("off", "Set OFF Mode")) + set_mode_node.add_child(CmdTreeNode("on", "Set ON Mode")) + set_mode_node.add_child(CmdTreeNode("normal", "Set NORMAL Mode")) + # after all mode children are added, the set_mode_node can be added to the parent: mode_node + mode_node.add_child(set_mode_node) # system mode commands + mode_node.add_child(CmdTreeNode("read_mode", "Read Mode")) # read the current system mode + + + + root_node.add_child(create_acs_node(mode_node, hk_node)) # we need to refer the hk & mode nodes to different subsystems or devices, as we want to use them to control a subsystem or device. + + # misc + scheduler_node = CmdTreeNode("scheduler", "Scheduler Node") + scheduler_node.add_child( + CmdTreeNode( + "schedule_ping_10_secs_ahead", "Schedule Ping to execute in 10 seconds" + ) + ) + root_node.add_child(scheduler_node) + return root_node + + +def pack_pus_telecommands(q: DefaultPusQueueHelper, cmd_path: str): + # It should always be at least the root path "/", so we split of the empty portion left of it. + cmd_path_list = cmd_path.split("/")[1:] + if len(cmd_path_list) == 0: + _LOGGER.warning("empty command path") + return + if cmd_path_list[0] == "test": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "ping": + q.add_log_cmd("Sending PUS ping telecommand") + return q.add_pus_tc( + PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=1) + ) + elif cmd_path_list[1] == "trigger_event": + q.add_log_cmd("Triggering test event") + return q.add_pus_tc( + PusTelecommand(apid=Apid.GENERIC_PUS, service=17, subservice=128) + ) + if cmd_path_list[0] == "scheduler": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "schedule_ping_10_secs_ahead": + q.add_log_cmd("Sending PUS scheduled TC telecommand") + crt_time = CdsShortTimestamp.from_now() + time_stamp = crt_time + datetime.timedelta(seconds=10) + time_stamp = time_stamp.pack() + return q.add_pus_tc( + create_time_tagged_cmd( + time_stamp, + PusTelecommand(service=17, subservice=1), + apid=Apid.SCHED, + ) + ) + if cmd_path_list[0] == "acs": + assert len(cmd_path_list) >= 2 + if cmd_path_list[1] == "mgms": + create_mgm_cmds(q, cmd_path_list) diff --git a/pytmtc/pus_tm.py b/pytmtc/pus_tm.py index ed55212..f29d8d1 100644 --- a/pytmtc/pus_tm.py +++ b/pytmtc/pus_tm.py @@ -1,93 +1,93 @@ -import logging -from typing import Any -from spacepackets.ccsds.time import CdsShortTimestamp -from spacepackets.ecss import PusTm -from spacepackets.ecss.pus_17_test import Service17Tm -from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams -from tmtccmd.logging.pus import RawTmtcTimedLogWrapper -from tmtccmd.pus import VerificationWrapper -from tmtccmd.tmtc import GenericApidHandlerBase - -from pytmtc.common import Apid, EventU32 -from pytmtc.hk import handle_hk_packet - - -_LOGGER = logging.getLogger(__name__) - - -class PusHandler(GenericApidHandlerBase): - def __init__( - self, - file_logger: logging.Logger, - verif_wrapper: VerificationWrapper, - raw_logger: RawTmtcTimedLogWrapper, - ): - super().__init__(None) - self.file_logger = file_logger - self.raw_logger = raw_logger - self.verif_wrapper = verif_wrapper - - def handle_tm(self, apid: int, packet: bytes, _user_args: Any): - try: - pus_tm = PusTm.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - except ValueError as e: - _LOGGER.warning("Could not generate PUS TM object from raw data") - _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") - raise e - service = pus_tm.service - if service == 1: - tm_packet = Service1Tm.unpack( - data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2) - ) - res = self.verif_wrapper.add_tm(tm_packet) - if res is None: - _LOGGER.info( - f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " - f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" - ) - _LOGGER.warning( - f"No matching telecommand found for {tm_packet.tc_req_id}" - ) - else: - self.verif_wrapper.log_to_console(tm_packet, res) - self.verif_wrapper.log_to_file(tm_packet, res) - elif service == 3: - pus_tm = PusTm.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - handle_hk_packet(pus_tm) - elif service == 5: - tm_packet = PusTm.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - src_data = tm_packet.source_data - event_u32 = EventU32.unpack(src_data) - _LOGGER.info( - f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}" - ) - if event_u32.group_id == 0 and event_u32.unique_id == 0: - _LOGGER.info("Received test event") - elif service == 17: - tm_packet = Service17Tm.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - if tm_packet.subservice == 2: - self.file_logger.info("Received Ping Reply TM[17,2]") - _LOGGER.info("Received Ping Reply TM[17,2]") - else: - self.file_logger.info( - f"Received Test Packet with unknown subservice {tm_packet.subservice}" - ) - _LOGGER.info( - f"Received Test Packet with unknown subservice {tm_packet.subservice}" - ) - else: - _LOGGER.info( - f"The service {service} is not implemented in Telemetry Factory" - ) - tm_packet = PusTm.unpack( - packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE - ) - self.raw_logger.log_tm(pus_tm) +import logging +from typing import Any +from spacepackets.ccsds.time import CdsShortTimestamp +from spacepackets.ecss import PusTm +from spacepackets.ecss.pus_17_test import Service17Tm +from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams +from tmtccmd.logging.pus import RawTmtcTimedLogWrapper +from tmtccmd.pus import VerificationWrapper +from tmtccmd.tmtc import GenericApidHandlerBase + +from pytmtc.common import Apid, EventU32 +from pytmtc.hk import handle_hk_packet + + +_LOGGER = logging.getLogger(__name__) + + +class PusHandler(GenericApidHandlerBase): + def __init__( + self, + file_logger: logging.Logger, + verif_wrapper: VerificationWrapper, + raw_logger: RawTmtcTimedLogWrapper, + ): + super().__init__(None) + self.file_logger = file_logger + self.raw_logger = raw_logger + self.verif_wrapper = verif_wrapper + + def handle_tm(self, apid: int, packet: bytes, _user_args: Any): + try: + pus_tm = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + except ValueError as e: + _LOGGER.warning("Could not generate PUS TM object from raw data") + _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") + raise e + service = pus_tm.service + if service == 1: + tm_packet = Service1Tm.unpack( + data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2) + ) + res = self.verif_wrapper.add_tm(tm_packet) + if res is None: + _LOGGER.info( + f"Received Verification TM[{tm_packet.service}, {tm_packet.subservice}] " + f"with Request ID {tm_packet.tc_req_id.as_u32():#08x}" + ) + _LOGGER.warning( + f"No matching telecommand found for {tm_packet.tc_req_id}" + ) + else: + self.verif_wrapper.log_to_console(tm_packet, res) + self.verif_wrapper.log_to_file(tm_packet, res) + elif service == 3: + pus_tm = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + handle_hk_packet(pus_tm) + elif service == 5: + tm_packet = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + src_data = tm_packet.source_data + event_u32 = EventU32.unpack(src_data) + _LOGGER.info( + f"Received event packet. Source APID: {Apid(tm_packet.apid)!r}, Event: {event_u32}" + ) + if event_u32.group_id == 0 and event_u32.unique_id == 0: + _LOGGER.info("Received test event") + elif service == 17: + tm_packet = Service17Tm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + if tm_packet.subservice == 2: + self.file_logger.info("Received Ping Reply TM[17,2]") + _LOGGER.info("Received Ping Reply TM[17,2]") + else: + self.file_logger.info( + f"Received Test Packet with unknown subservice {tm_packet.subservice}" + ) + _LOGGER.info( + f"Received Test Packet with unknown subservice {tm_packet.subservice}" + ) + else: + _LOGGER.info( + f"The service {service} is not implemented in Telemetry Factory" + ) + tm_packet = PusTm.unpack( + packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE + ) + self.raw_logger.log_tm(pus_tm) diff --git a/requirements.txt b/requirements.txt index 9c558e3..81c6f00 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -. +. +spacepackets~=0.28.0 +tmtccmd~=8.2.0 \ No newline at end of file diff --git a/tests/test_tc_mods.py b/tests/test_tc_mods.py index 0b56bde..bf76c02 100644 --- a/tests/test_tc_mods.py +++ b/tests/test_tc_mods.py @@ -1,48 +1,48 @@ -from unittest import TestCase - -from spacepackets.ccsds import CdsShortTimestamp -from tmtccmd.tmtc import DefaultPusQueueHelper, QueueEntryHelper -from tmtccmd.tmtc.queue import QueueWrapper - -from pytmtc.config import SatrsConfigHook -from pytmtc.pus_tc import pack_pus_telecommands - - -class TestTcModules(TestCase): - def setUp(self): - self.hook = SatrsConfigHook(json_cfg_path="tmtc_conf.json") - self.queue_helper = DefaultPusQueueHelper( - queue_wrapper=QueueWrapper.empty(), - tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, - seq_cnt_provider=None, - pus_verificator=None, - default_pus_apid=None, - ) - - def test_cmd_tree_creation_works_without_errors(self): - cmd_defs = self.hook.get_command_definitions() - self.assertIsNotNone(cmd_defs) - - def test_ping_cmd_generation(self): - pack_pus_telecommands(self.queue_helper, "/test/ping") - queue_entry = self.queue_helper.queue_wrapper.queue.popleft() - entry_helper = QueueEntryHelper(queue_entry) - log_queue = entry_helper.to_log_entry() - self.assertEqual(log_queue.log_str, "Sending PUS ping telecommand") - queue_entry = self.queue_helper.queue_wrapper.queue.popleft() - entry_helper.entry = queue_entry - pus_tc_entry = entry_helper.to_pus_tc_entry() - self.assertEqual(pus_tc_entry.pus_tc.service, 17) - self.assertEqual(pus_tc_entry.pus_tc.subservice, 1) - - def test_event_trigger_generation(self): - pack_pus_telecommands(self.queue_helper, "/test/trigger_event") - queue_entry = self.queue_helper.queue_wrapper.queue.popleft() - entry_helper = QueueEntryHelper(queue_entry) - log_queue = entry_helper.to_log_entry() - self.assertEqual(log_queue.log_str, "Triggering test event") - queue_entry = self.queue_helper.queue_wrapper.queue.popleft() - entry_helper.entry = queue_entry - pus_tc_entry = entry_helper.to_pus_tc_entry() - self.assertEqual(pus_tc_entry.pus_tc.service, 17) - self.assertEqual(pus_tc_entry.pus_tc.subservice, 128) +from unittest import TestCase + +from spacepackets.ccsds import CdsShortTimestamp +from tmtccmd.tmtc import DefaultPusQueueHelper, QueueEntryHelper +from tmtccmd.tmtc.queue import QueueWrapper + +from pytmtc.config import SatrsConfigHook +from pytmtc.pus_tc import pack_pus_telecommands + + +class TestTcModules(TestCase): + def setUp(self): + self.hook = SatrsConfigHook(json_cfg_path="tmtc_conf.json") + self.queue_helper = DefaultPusQueueHelper( + queue_wrapper=QueueWrapper.empty(), + tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE, + seq_cnt_provider=None, + pus_verificator=None, + default_pus_apid=None, + ) + + def test_cmd_tree_creation_works_without_errors(self): + cmd_defs = self.hook.get_command_definitions() + self.assertIsNotNone(cmd_defs) + + def test_ping_cmd_generation(self): + pack_pus_telecommands(self.queue_helper, "/test/ping") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper = QueueEntryHelper(queue_entry) + log_queue = entry_helper.to_log_entry() + self.assertEqual(log_queue.log_str, "Sending PUS ping telecommand") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper.entry = queue_entry + pus_tc_entry = entry_helper.to_pus_tc_entry() + self.assertEqual(pus_tc_entry.pus_tc.service, 17) + self.assertEqual(pus_tc_entry.pus_tc.subservice, 1) + + def test_event_trigger_generation(self): + pack_pus_telecommands(self.queue_helper, "/test/trigger_event") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper = QueueEntryHelper(queue_entry) + log_queue = entry_helper.to_log_entry() + self.assertEqual(log_queue.log_str, "Triggering test event") + queue_entry = self.queue_helper.queue_wrapper.queue.popleft() + entry_helper.entry = queue_entry + pus_tc_entry = entry_helper.to_pus_tc_entry() + self.assertEqual(pus_tc_entry.pus_tc.service, 17) + self.assertEqual(pus_tc_entry.pus_tc.subservice, 128)