68 Commits

Author SHA1 Message Date
b9c8c9880f prep v0.3.4 2025-01-13 10:40:41 +01:00
7c14b2a2e0 prep v0.3.4 2025-01-13 10:38:07 +01:00
2403f3a019 prep v0.3.3 2025-01-13 10:33:51 +01:00
f52073d117 some minor updates for linting and formatting 2025-01-13 10:28:02 +01:00
2b31b91237 Merge pull request 'Event Parser Improvements' (#2) from meier/event-parser-improvements into main
Reviewed-on: #2
Reviewed-by: Robin Müller <muellerr@irs.uni-stuttgart.de>
2025-01-09 12:30:16 +01:00
Jakob Meier
497781555d - supports definition of event ids in hexadecimal format
- subsystem id can be extracted also when the subsystem id is not defined separately in the header file
2024-12-22 18:16:59 +01:00
bbe55592ec prep v0.3.2 2023-03-24 15:37:36 +01:00
fe6c68d97b add explicit handling for duplicate event names
when writing the translation file
2023-03-24 15:33:11 +01:00
cada155b8e prep v0.3.1 2023-03-14 15:24:55 +01:00
c40db0c10d bump version 2023-02-19 13:20:09 +01:00
1f1d7ab62a add support for for skipping retvals and events 2023-02-19 12:58:08 +01:00
98ecaba93a raise exception instead of exiting with useless error 2023-02-09 15:57:56 +01:00
66e31885a7 add changelog and removing logging module 2023-02-09 15:02:52 +01:00
b1e5a2d40a small tweak for updated fsfw 2022-08-24 17:26:49 +02:00
e84be4bb17 now it works again 2022-08-09 10:37:50 +02:00
fd9838bcba improved the file connector algorithm 2022-08-09 10:17:31 +02:00
9c412ace74 extended TODO 2022-07-20 11:04:53 +02:00
911aa0d89d added todo 2022-07-20 11:02:27 +02:00
a5dee6e417 improved csv format 2022-06-21 01:21:01 +02:00
a2e0c4f98e some more minor improvements 2022-06-21 00:57:01 +02:00
1b1ac86e8c more improvements and fixes for fsfwgen 2022-06-21 00:51:13 +02:00
36b44d1e26 new Path handling 2022-06-20 18:02:46 +02:00
fc191cc50e Refactor Path handling 2022-06-20 16:56:05 +02:00
6d423f7106 add retvals to possilbe pos args 2022-05-20 09:22:38 +02:00
169ad98cde move system folder 2022-05-03 16:07:06 +02:00
5ad9fb94af need more input for this 2022-03-30 20:20:30 +02:00
23990fbe8a start payload system yml 2022-03-30 20:18:43 +02:00
139a44285c reduced some dfts 2022-03-30 19:50:41 +02:00
52e942338a continued system description yml 2022-03-30 19:47:25 +02:00
1c8be25e18 minor format change 2022-03-30 17:11:48 +02:00
3da2cbc303 added some more transitions 2022-03-30 15:13:40 +02:00
8a353d391c continued system.yml file 2022-03-30 14:51:10 +02:00
a3ea5dd2e7 added additional choice for args parser 2022-03-22 20:42:30 +01:00
1c9f3eccef replace backslashes with regular slashes 2022-03-22 11:11:32 +01:00
8a2caf120d avoid duplicate events in translation file 2022-03-22 10:19:37 +01:00
8829f3face added first transition entries 2022-03-21 10:10:32 +01:00
bbfcaa6a1e finished system tree 2022-03-21 10:09:00 +01:00
c6105af3e3 create first system.yml file 2022-03-21 09:35:49 +01:00
Robin Mueller
5dbe33ceb8 README update 2022-03-11 11:56:49 +01:00
Robin Mueller
925733b7a5 update gitignore 2022-03-11 11:45:36 +01:00
Robin Mueller
aec15d0961 fsfwgen is a package now 2022-03-11 11:35:47 +01:00
Robin Mueller
c5ef1783a3 small bugfix 2022-03-07 17:10:00 +01:00
Robin Mueller
348877b5d9 check for duplicate full IDs now 2022-03-07 13:19:44 +01:00
Robin Mueller
52f291692c store event ID val as hex now as well 2022-03-04 11:01:49 +01:00
Robin Mueller
5c8895b631 rel path handling 2022-03-04 10:37:30 +01:00
Robin Mueller
330db88249 small form change 2022-03-04 10:26:14 +01:00
Robin Mueller
1531b8bda0 some bugfixes 2022-03-04 10:24:08 +01:00
bd76760052 some minor updates and bugfixes 2022-02-26 14:07:14 +01:00
8c51049821 small bugfix 2022-02-26 13:25:59 +01:00
1be773a20f improved logger and applied black formatter 2022-02-26 13:16:09 +01:00
24fa9a3fe3 fix in event parser 2022-02-03 17:31:18 +01:00
80312ed21e Merge branch 'main' of https://egit.irs.uni-stuttgart.de/fsfw/fsfw-generators 2022-02-03 17:13:56 +01:00
502f7d4f5e update output file format 2022-02-03 17:13:49 +01:00
39f2bbcabd update core 2021-10-21 14:04:50 +02:00
636670f7a0 expecting project strign now 2021-08-02 13:02:33 +02:00
16280cc1df several bugfixes and improvements 2021-08-02 12:49:10 +02:00
8e805d2408 introduced logger 2021-07-31 20:49:38 +02:00
2043b92cd4 more checks in move function 2021-07-31 20:07:41 +02:00
4b494ae07c form improvements 2021-07-29 12:55:05 +02:00
c3fade12a9 important bugfix 2021-07-20 18:21:26 +02:00
593f866bfe header file name update 2021-07-20 18:11:01 +02:00
e6f5ff1812 important bugfix 2021-06-21 16:09:47 +02:00
f81c144252 description parsing working 2021-06-21 12:57:07 +02:00
9464739ae1 and more form changes 2021-06-21 12:54:03 +02:00
75969c45ee minor form changes 2021-06-21 12:52:01 +02:00
798efe2048 made some more code generic 2021-06-21 12:47:03 +02:00
f75892776e cleaned up a bit 2021-06-21 11:14:49 +02:00
78e890f947 renamed internal includes 2021-06-08 17:10:24 +02:00
27 changed files with 1434 additions and 860 deletions

133
.gitignore vendored
View File

@@ -1 +1,132 @@
__pycache__
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# PyCharm
.idea

36
CHANGELOG.md Normal file
View File

@@ -0,0 +1,36 @@
Change Log
=======
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.3.4]
- Hotfixes for pyproject.toml file
# [v0.3.3]
- Fixes for event parsing
- Removed `setup.cfg` and `setup.py`
# [v0.3.2]
- Added handling for duplicate event names when writing the event translation
file.
# [v0.3.1]
- Sorted returnvalue export by raw returnvalue.
# [v0.3.0]
- Description parser is now more generic and parses a list of meta informations.
- Added support for skip directive when parsing meta information.
# [v0.2.0]
- Remove `logging` module and move to more pythonic logging usage.

View File

@@ -1,5 +1,38 @@
# FSFW Generators
Generic core module to generate various files or databases like the Mission Information Base (MIB),
returnvalue and event lists, mode tables for on-board software (OBSW) using the
Flight Software Framework (FSFW)
Generic Python module to generate source code or information for the
Flight Software Framework.
Currently, this includes the following helper modules:
1. `events` to generate Event translation source files and CSV lists
2. `returnvalues` to generate Returnvalue translation source files and CSV lists
3. `objects` to generate Object ID translation files and CSV lists
## Installing
It is recommended to use a virtual environment
**Linux**
```sh
python3 -m venv venv
. venv/bin/activate
```
**Windows**
```sh
py -m venv venv
. venv/bin/activate
```
Then you can install the package with
```sh
python -m pip install .
```
You can add `-e` after `install` to perform an interactive/debug installation.
This is recommended if you debugging, planning to extend the script or
performing changes.

View File

@@ -1,267 +0,0 @@
import re
from modgen.parserbase.parser import FileParser
EVENT_ENTRY_NAME_IDX = 0
EVENT_ENTRY_SEVERITY_IDX = 1
EVENT_ENTRY_INFO_IDX = 2
EVENT_SOURCE_FILE_IDX = 3
FSFW_EVENT_HEADER_INCLUDE = "#include <fsfw/events/Event.h>"
DEFAULT_MOVING_WINDOWS_SIZE = 7
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
EVENT_NAME_IDX = 1
class SubsystemDefinitionParser(FileParser):
def __init__(self, file_list):
super().__init__(file_list)
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r")
for line in file.readlines():
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', line)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs):
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', moving_window[self.moving_window_center_idx])
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _post_parsing_operation(self):
pass
class EventParser(FileParser):
def __init__(self, file_list, interface_list):
super().__init__(file_list)
self.interfaces = interface_list
self.count = 0
self.my_id = 0
self.current_id = 0
self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
try:
file = open(file_name, 'r', encoding='utf-8')
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, 'r', encoding='cp1252')
all_lines = file.readlines()
total_count = 0
for line in all_lines:
self.__handle_line_reading(line, file_name)
if self.count > 0:
print("File " + file_name + " contained " + str(self.count) + " events.")
total_count += self.count
self.count = 0
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs):
subsystem_id_assignment_match = re.search(
rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);", moving_window[self.moving_window_center_idx]
)
if subsystem_id_assignment_match:
# For now, it is assumed that there is only going to be one subsystem ID per class / source file
# if "SUBSYSTEM_ID" in subsystem_id_assignment_match.group(1):
try:
self.current_id = self.interfaces[subsystem_id_assignment_match.group(2)][0]
self.my_id = self.return_number_from_string(self.current_id)
except KeyError as e:
print(f"Key not found: {e}")
# Now try to look for event definitions. Moving windows allows multi line event definitions
# These two variants need to be checked
event_match = re.match(
r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*event::makeEvent[^\n]*",
moving_window[self.moving_window_center_idx]
)
macro_api_match = False
if not event_match:
event_match = re.match(
r"[\s]*static[\s]*const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*MAKE_EVENT[^\n]*",
moving_window[self.moving_window_center_idx]
)
if event_match:
macro_api_match = True
if event_match:
self.__handle_event_match(
event_match=event_match, macro_api_match=macro_api_match, moving_window=moving_window,
file_name=file_name
)
def __handle_event_match(self, event_match, macro_api_match: bool, moving_window: list, file_name: str):
event_full_match = False
if ";" in event_match.group(0):
event_full_match = self.__handle_one_line_event_match(
macro_api_match=macro_api_match, moving_window=moving_window
)
# Description will be parsed separately later
description = " "
if event_full_match:
name = event_match.group(EVENT_NAME_IDX)
if macro_api_match:
full_id = (self.my_id * 100) + self.return_number_from_string(event_full_match.group(2))
severity = event_full_match.group(3)
else:
full_id = (self.my_id * 100) + self.return_number_from_string(event_full_match.group(3))
severity = event_full_match.group(4)
self.mib_table.update({full_id: (name, severity, description, file_name)})
self.count = self.count + 1
def __handle_one_line_event_match(self, macro_api_match: bool, moving_window: list):
if macro_api_match:
# One line event definition.
regex_string = \
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \
r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
else:
regex_string = \
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \
r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;"
event_full_match = re.search(regex_string, moving_window[self.moving_window_center_idx])
return event_full_match
def _post_parsing_operation(self):
pass
def __handle_line_reading(self, line, file_name):
if not self.last_lines[0] == '\n':
twolines = self.last_lines[0] + ' ' + line.strip()
else:
twolines = ''
match1 = re.search(r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines)
if match1:
self.current_id = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.my_id = self.return_number_from_string(self.current_id)
match = re.search(
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", twolines
)
if match:
if match.group(1):
self.last_lines[0] = line
return
description = " "
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.my_id * 100) + self.return_number_from_string(match.group(3))
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
self.mib_table.update({full_id: (string_to_add, severity, description, file_name)})
self.count = self.count + 1
self.last_lines[0] = line
def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part)
if len(my_str) > 16:
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
my_str = my_str[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return my_str
@staticmethod
def return_number_from_string(a_string):
if a_string.startswith('0x'):
return int(a_string, 16)
elif a_string.isdigit():
return int(a_string)
else:
print('EventParser: Illegal number representation: ' + a_string)
return 0
@staticmethod
def convert(name):
single_strings = name.split('_')
new_string = ''
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(description):
description = description.lstrip('//!<>')
description = description.lstrip()
if description == '':
description = ' '
return description
def export_to_file(filename: str, event_list: list, file_separator: str):
file = open(filename, "w")
for entry in event_list:
event_id = entry[0]
event_value = entry[1]
file.write(
str(event_id) + file_separator + event_value[EVENT_ENTRY_NAME_IDX] + file_separator +
event_value[EVENT_ENTRY_SEVERITY_IDX] + file_separator + event_value[EVENT_ENTRY_INFO_IDX] +
file_separator + event_value[EVENT_SOURCE_FILE_IDX] + '\n'
)
file.close()
return
def write_translation_source_file(event_list: list, date_string: str, filename: str = "translateEvents.cpp"):
outputfile = open(filename, "w")
definitions = ""
function = "const char * translateEvents(Event event) {\n\tswitch( (event & 0xffff) ) {\n"
for entry in event_list:
event_id = entry[0]
event_value = entry[1]
definitions += \
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING = \"{event_value[EVENT_ENTRY_NAME_IDX]}\";\n"
function += f"\tcase({event_id}):\n\t\treturn {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
function += '\tdefault:\n\t\treturn "UNKNOWN_EVENT";\n'
outputfile.write(
f"/**\n * @brief Auto-generated event translation file. Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
outputfile.write("#include \"translateEvents.h\"\n\n")
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
outputfile.close()
def write_translation_header_file(filename: str = "translateEvents.h"):
file = open(filename, "w")
file.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char * translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def handle_csv_export(file_name: str, event_list: list, file_separator: str):
"""
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
print("EventParser: Exporting to file: " + file_name)
export_to_file(filename=file_name, event_list=event_list, file_separator=file_separator)
def handle_cpp_export(
event_list: list, date_string: str, file_name: str = "translateEvents.cpp", generate_header: bool = True,
header_file_name: str = "translateEvents.h"
):
print("EventParser: Generating translation cpp file.")
write_translation_source_file(event_list=event_list, date_string=date_string, filename=file_name)
if generate_header:
write_translation_header_file(filename=header_file_name)

5
fsfwgen/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
__version__ = "0.3.2"
VERSION_MAJOR = 0
VERSION_MINOR = 3
VERSION_REVISION = 2

25
fsfwgen/core.py Normal file
View File

@@ -0,0 +1,25 @@
import enum
import argparse
class ParserTypes(enum.Enum):
EVENTS = "events"
OBJECTS = "objects"
RETVALS = "returnvalues"
SUBSERVICES = "subservices"
def init_printout(project_string: str):
print(f"-- {project_string} MIB Generator --")
def return_generic_args_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser("Arguments for FSFW MIB generation")
choices = ("events", "objects", "returnvalues", "retvals", "subservices", "all")
parser.add_argument(
"type",
metavar="type",
choices=choices,
help=f"Type of MIB data to generate. Choices: {choices}",
)
return parser

View File

@@ -0,0 +1,319 @@
import re
import logging
from pathlib import Path
from typing import List, Optional, Dict
from fsfwgen.parserbase.parser import FileParser, MetaType
_LOGGER = logging.getLogger(__name__)
FSFW_EVENT_HEADER_INCLUDE = '#include "fsfw/events/Event.h"'
DEFAULT_MOVING_WINDOWS_SIZE = 7
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
EVENT_NAME_IDX = 1
class SubsystemDefinitionParser(FileParser):
def __init__(self, file_list):
super().__init__(file_list)
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r")
for line in file.readlines():
match = re.search(r"([A-Z0-9_]*) = ([0-9]{1,3})", line)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _handle_file_parsing_moving_window(
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
match = re.search(
r"([A-Z0-9_]*) = ([0-9]{1,3})", moving_window[self.moving_window_center_idx]
)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _post_parsing_operation(self):
pass
class EventEntry:
def __init__(self, name: str, severity: str, description: str, file_name: Path):
self.name = name
self.severity = severity
self.description = description
self.file_name = file_name
def __repr__(self):
return (
f"EventEntry(name={self.name!r}, severity={self.severity!r}, "
f"description={self.description!r}, file_name={self.file_name!r}"
)
EventDictT = Dict[int, EventEntry]
class EventParser(FileParser):
def __init__(self, file_list: List[Path], interface_list, moving_window_size: int = 7):
super().__init__(file_list)
self.set_moving_window_mode(moving_window_size)
self.interfaces = interface_list
self.count = 0
self.my_id = 0
self.current_id = 0
self.mib_table: EventDictT = dict()
self.obsw_root_path: Optional[Path] = None
self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs):
logging.warning("Regular file parsing mode not implemented")
def _handle_file_parsing_moving_window(
self,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
subsystem_id_assignment_match = re.search(
rf"{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*)",
moving_window[self.moving_window_center_idx],
)
if subsystem_id_assignment_match:
# For now, it is assumed that there is only going to be one subsystem ID per
# class / source file
try:
self.current_id = self.interfaces[subsystem_id_assignment_match.group(1)][0]
self.my_id = self.return_number_from_string(self.current_id)
except KeyError as e:
print(f"Key not found: {e}")
# Now try to look for event definitions. Moving windows allows multi line event definitions
# These two variants need to be checked
event_match = re.match(
r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=([^\n]*)",
moving_window[self.moving_window_center_idx],
)
macro_api_match = False
if event_match is not None:
valid_event = False
for idx in range(3):
if "MAKE_EVENT" in moving_window[self.moving_window_center_idx + idx]:
macro_api_match = True
valid_event = True
break
elif "makeEvent" in moving_window[self.moving_window_center_idx + idx]:
valid_event = True
break
if not valid_event:
event_match = False
if event_match:
self.__handle_event_match(
event_match=event_match,
macro_api_match=macro_api_match,
moving_window=moving_window,
file_name=file_name,
)
def __handle_event_match(
self, event_match, macro_api_match: bool, moving_window: list, file_name: Path
):
if ";" in event_match.group(0):
event_full_match = self.__generate_regex_event_match(
macro_api_match=macro_api_match,
full_string=moving_window[self.moving_window_center_idx],
)
else:
multi_line_string = self.__build_multi_line_event_string(
first_line=event_match.group(0), moving_window=moving_window
)
event_full_match = self.__generate_regex_event_match(
macro_api_match=macro_api_match, full_string=multi_line_string
)
description = "No description"
meta_list = self._search_for_descrip_string_generic(
moving_window=moving_window,
break_pattern=r"[\s]*static const(?:expr)?[\s]*Event[\s]*",
)
for meta in meta_list:
if meta.type == MetaType.SKIP:
return
elif meta.type == MetaType.DESC:
description = meta.value
if event_full_match:
name = event_match.group(EVENT_NAME_IDX)
if macro_api_match:
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(2)
)
severity = event_full_match.group(3)
else:
if event_full_match.group(1) == "EV_REPLY_INVALID_SIZE":
print(f"Group 3: {event_full_match.group(3)}")
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(3)
)
severity = event_full_match.group(4)
if self.obsw_root_path is not None:
file_name = file_name.relative_to(self.obsw_root_path)
if self.mib_table.get(full_id) is not None:
_LOGGER.warning(f"Duplicate event ID {full_id} detected")
_LOGGER.info(
f"Name: {self.mib_table.get(full_id).name}| "
f"Description: {self.mib_table.get(full_id).description}"
)
self.mib_table.update({full_id: EventEntry(name, severity, description, file_name)})
self.count = self.count + 1
@staticmethod
def __generate_regex_event_match(macro_api_match: bool, full_string: str):
if macro_api_match:
# One line event definition.
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"MAKE_EVENT\((0x[0-9a-fA-F]+|[0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
)
else:
regex_string = r"static const(?:expr)? Event\s*([\w]+)\s*=\s*event::makeEvent\(([\w:]+),\s*(0x[0-9a-fA-F]+|[0-9]{1,3})\s*,\s*severity::([A-Z]+)\)\s*;"
event_full_match = re.search(regex_string, full_string)
return event_full_match
def __build_multi_line_event_string(self, first_line: str, moving_window: List[str]) -> str:
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
)
def _post_parsing_operation(self):
pass
def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part)
if len(my_str) > 16:
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
my_str = my_str[0:14]
return my_str
@staticmethod
def return_number_from_string(a_string):
if a_string.startswith("0x"):
return int(a_string, 16)
elif a_string.isdigit():
return int(a_string)
else:
print("EventParser: Illegal number representation: " + a_string)
return 0
@staticmethod
def convert(name):
single_strings = name.split("_")
new_string = ""
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(description):
description = description.lstrip("//!<>")
description = description.lstrip()
if description == "":
description = " "
return description
def export_to_csv(filename: Path, event_list: EventDictT, col_sep: str):
with open(filename, "w") as out:
fsep = col_sep
out.write(
f"Event ID (dec){col_sep} Event ID (hex){col_sep} Name{col_sep} "
f"Severity{col_sep} Description{col_sep} File Path\n"
)
for entry in event_list.items():
event_id = int(entry[0])
event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}"
out.write(
f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value.name}{fsep}"
f"{event_value.severity}{fsep}{event_value.description}"
f"{fsep}{event_value.file_name.as_posix()}\n"
)
def write_translation_source_file(
event_list: EventDictT, date_string: str, filename: Path = "translateEvents.cpp"
):
with open(filename, "w") as out:
definitions = ""
# Look up table to avoid duplicate events
lut = dict()
function = "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
for entry in event_list.items():
event_id = entry[0]
event_value = entry[1]
name = event_value.name
if name in lut:
_LOGGER.warning(
"Duplicate name detected when generating event translation source file"
)
_LOGGER.warning(f"Name: {name}, Event Entry: {event_value}")
name = f"{name}_{event_id}"
_LOGGER.info(f"Created unique name {name}")
definitions += f"const char *{name}_STRING " f'= "{name}";\n'
function += f" case ({event_id}):\n " f"return {name}_STRING;\n"
lut.update({name: event_value})
function += ' default:\n return "UNKNOWN_EVENT";\n'
out.write(
f"/**\n * @brief Auto-generated event translation file. "
f"Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
out.write('#include "translateEvents.h"\n\n')
out.write(definitions + "\n" + function + " }\n return 0;\n}\n")
def write_translation_header_file(filename: Path = "translateEvents.h"):
with open(filename, "w") as out:
out.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char *translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def handle_csv_export(file_name: Path, event_list: EventDictT, file_separator: str):
"""
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
export_to_csv(filename=file_name, event_list=event_list, col_sep=file_separator)
def handle_cpp_export(
event_list: EventDictT,
date_string: str,
file_name: Path = "translateEvents.cpp",
generate_header: bool = True,
header_file_name: Path = "translateEvents.h",
):
write_translation_source_file(
event_list=event_list, date_string=date_string, filename=file_name
)
if generate_header:
write_translation_header_file(filename=header_file_name)

View File

@@ -0,0 +1,94 @@
import logging
import re
from pathlib import Path
from typing import List
from fsfwgen.parserbase.parser import FileParser
from fsfwgen.utility.sql_writer import SqlWriter
_LOGGER = logging.getLogger(__name__)
class ObjectDefinitionParser(FileParser):
def __init__(self, file_list: List[Path]):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
file = open(file_name, "r", encoding="utf-8")
for line in file.readlines():
match = re.search(r"([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)", line)
if match:
self.mib_table.update({match.group(2): [match.group(1)]})
def _handle_file_parsing_moving_window(
self,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
pass
def _post_parsing_operation(self):
pass
def export_object_file(filename, object_list, file_separator: str = ","):
file = open(filename, "w")
for entry in object_list:
file.write(str(entry[0]) + file_separator + entry[1][0] + "\n")
file.close()
def write_translation_file(filename: str, list_of_entries, date_string_full: str):
with open(filename, "w") as out:
_LOGGER.info("ObjectParser: Writing translation file " + filename)
definitions = ""
function = (
"const char *translateObject(object_id_t object) "
"{\n switch ((object & 0xFFFFFFFF)) {\n"
)
for entry in list_of_entries:
# first part of translate file
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
out.write(
f"/**\n * @brief Auto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n"
)
out.write('#include "translateObjects.h"\n\n')
out.write(definitions + "\n" + function + " return 0;\n}\n")
def write_translation_header_file(filename: str = "translateObjects.h"):
file = open(filename, "w")
file.write(
"#ifndef FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n"
"#define FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n\n"
"#include <fsfw/objectmanager/SystemObjectIF.h>\n\n"
"const char *translateObject(object_id_t object);\n\n"
"#endif /* FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_ */\n"
)
def sql_object_exporter(
object_table: list,
db_filename: str,
delete_cmd: str,
create_cmd: str,
insert_cmd: str,
):
sql_writer = SqlWriter(db_filename=db_filename)
sql_writer.delete(delete_cmd)
sql_writer.open(create_cmd)
for entry in object_table:
sql_writer.write_entries(insert_cmd, (entry[0], entry[1][0]))
sql_writer.commit()
sql_writer.close()

View File

@@ -0,0 +1,66 @@
"""Generic File Parser class
Used by parse header files. Implemented as class in case header parser becomes more complex
"""
import logging
from pathlib import Path
from typing import Union, List
_LOGGER = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
class FileListParser:
"""Generic header parser which takes a directory name or directory name list
and parses all included header files recursively.
TODO: Filter functionality for each directory to filter out files or folders
"""
def __init__(self, directory_list_or_name: Union[Path, List[Path]]):
self.directory_list = []
if isinstance(directory_list_or_name, Path):
self.directory_list.append(directory_list_or_name)
elif isinstance(directory_list_or_name, List):
self.directory_list.extend(directory_list_or_name)
else:
_LOGGER.warning(
"Header Parser: Passed directory list is not a header name or list of header names"
)
self.header_files = []
def parse_header_files(
self,
search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False,
) -> List[Path]:
"""This function is called to get a list of header files
:param search_recursively:
:param printout_string:
:param print_current_dir:
:return:
"""
print(printout_string, end="")
for directory in self.directory_list:
self.__get_header_file_list(directory, search_recursively, print_current_dir)
print(str(len(self.header_files)) + " header files were found.")
# g.PP.pprint(self.header_files)
return self.header_files
def __get_header_file_list(
self,
base_directory: Path,
seach_recursively: bool = False,
print_current_dir: bool = False,
):
local_header_files = []
if print_current_dir:
print(f"Parsing header files in: {base_directory}")
for entry in base_directory.iterdir():
if entry.is_file() and entry.suffix == ".h" and entry.as_posix()[0] not in [".", "_"]:
local_header_files.append(entry)
if seach_recursively:
if entry.is_dir():
self.__get_header_file_list(entry, seach_recursively)
self.header_files.extend(local_header_files)

View File

@@ -1,6 +1,6 @@
#! /usr/bin/python3
"""
@file mib_packet_content_parser.py
@file packet_content_parser.py
@brief Generic File Parser class
@details
Used by the MIB Exporter. There are multiple functions which are abstract and should
@@ -11,14 +11,36 @@ Child classes fill out the MIB table (self.mib_table)
@author R. Mueller
@date 14.11.2019
"""
import dataclasses
import enum
import re
from abc import abstractmethod
from typing import Dict
from pathlib import Path
from typing import Dict, List
from enum import Enum
class FileParserModes(enum.Enum):
REGULAR = enum.auto(),
MOVING_WINDOW = enum.auto()
class VerbosityLevels(enum.Enum):
REDUCED = 0
REGULAR = 1
DEBUG = 2
class FileParserModes(Enum):
REGULAR = 1
MOVING_WINDOW = 2
class MetaType(enum.IntEnum):
SKIP = 1
DESC = 2
@dataclasses.dataclass
class MetaInfo:
type: MetaType
value: str = ""
class FileParser:
@@ -31,7 +53,8 @@ class FileParser:
3. Call parse_files. Additional arguments and keyword arguments can be supplied as well and
will be passed through to the abstract function implementations.
"""
def __init__(self, file_list):
def __init__(self, file_list: List[Path]):
if len(file_list) == 0:
print("File list is empty !")
self.file_list_empty = True
@@ -47,6 +70,7 @@ class FileParser:
self.__debug_moving_window = False
self.__debug_moving_window_filename = ""
self._moving_window_center_idx = 3
self._verbose_level = 1
@@ -70,6 +94,9 @@ class FileParser:
def set_verbosity(self, verbose_level: int):
self._verbose_level = verbose_level
def get_verbosity(self):
return self._verbose_level
def enable_moving_window_debugging(self, file_name: str):
self.__debug_moving_window = True
self.__debug_moving_window_filename = file_name
@@ -82,7 +109,7 @@ class FileParser:
:return: Returns the mib table dictionary.
"""
if self.file_list_empty:
print(f"Nothing to parse, supplied file list is empty!")
print("Nothing to parse, supplied file list is empty!")
return self.mib_table
if self.__parser_mode == FileParserModes.REGULAR:
@@ -99,7 +126,7 @@ class FileParser:
return self.mib_table
@abstractmethod
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
"""
Implemented by child class. The developer should fill the info table (self.mib_table)
in this routine
@@ -112,8 +139,14 @@ class FileParser:
@abstractmethod
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs):
self,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
"""
This will be called for the MOVING_WINDOW parser mode.
:param file_name: Current file name
@@ -133,7 +166,7 @@ class FileParser:
:return:
"""
def __parse_file_with_moving_window(self, file_name: str, *args, **kwargs):
def __parse_file_with_moving_window(self, file_name: Path, *args, **kwargs):
all_lines = self._open_file(file_name=file_name)
moving_window_size = self.__parser_args
if moving_window_size == 0:
@@ -150,8 +183,9 @@ class FileParser:
# More and more of the window is inside the file now
elif line_idx < moving_window_size:
for idx in range(line_idx, 0, -1):
moving_window[moving_window_size - 1 - idx] = \
moving_window[moving_window_size - idx]
moving_window[moving_window_size - 1 - idx] = moving_window[
moving_window_size - idx
]
moving_window[moving_window_size - 1] = line
# The full window is inside the file now.
elif line_idx >= moving_window_size:
@@ -159,7 +193,7 @@ class FileParser:
moving_window[idx] = moving_window[idx + 1]
moving_window[moving_window_size - 1] = line
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print(f"Moving window post line anaylsis line {line_idx}")
print(f"Moving window post line analysis line {line_idx}")
print(moving_window)
self._handle_file_parsing_moving_window(
file_name, line_idx, moving_window_size, moving_window, *args, **kwargs
@@ -168,7 +202,7 @@ class FileParser:
# the file are assigned an empty string until the window has moved out of file completely
for remaining_windows_idx in range(moving_window_size):
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print(f"Moving window pre line analysis post EOF")
print("Moving window pre line analysis post EOF")
print(moving_window)
num_entries_to_clear = remaining_windows_idx + 1
for idx_to_clear in range(num_entries_to_clear):
@@ -176,22 +210,73 @@ class FileParser:
for idx_to_reassign in range(moving_window_size - 1 - num_entries_to_clear):
moving_window[idx_to_reassign] = moving_window[idx_to_reassign + 1]
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print(f"Moving window post line anaylsis post EOF")
print("Moving window post line analysis post EOF")
print(moving_window)
pass
@staticmethod
def _open_file(file_name: str) -> list:
def _open_file(file_name: Path) -> list:
"""
Open a file, attempting common encodings utf-8 and cp1252
:param file_name:
:return:
"""
try:
file = open(file_name, 'r', encoding='utf-8')
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
print("ReturnValueParser: Decoding error with file " + file_name)
file = open(file_name, 'r', encoding='cp1252')
print(f"Parser: Decoding error with file {file_name}")
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
return all_lines
def _build_multi_line_string_generic(self, first_line: str, moving_window: List[str]) -> str:
"""This function transforms a multi line match into a one line match by searching for the
semicolon at the string end"""
all_lines = first_line.rstrip()
end_found = False
current_idx = self._moving_window_center_idx
while not end_found and current_idx < len(moving_window) - 1:
current_idx += 1
string_to_add = moving_window[current_idx].lstrip()
if ";" in moving_window[current_idx]:
all_lines += string_to_add
break
else:
string_to_add.rstrip()
all_lines += string_to_add
return all_lines
def _search_for_descrip_string_generic(
self, moving_window: List[str], break_pattern: str
) -> List[MetaInfo]:
current_idx = self._moving_window_center_idx - 1
# Look at the line above first
export_match = re.search(r"\[EXPORT]\s*:\s*\[(\w*)]", moving_window[current_idx])
if not export_match:
while True:
if re.search(break_pattern, moving_window[current_idx]):
break
export_match = re.search(r"\[EXPORT]\s*:\s*\[(\w*)]", moving_window[current_idx])
if export_match or current_idx <= 0:
break
current_idx -= 1
info = MetaInfo(MetaType.DESC)
if export_match:
if export_match.group(1).lower() == "comment":
current_build_idx = current_idx
descrip_string = ""
while current_build_idx < self._moving_window_center_idx:
string_to_add = moving_window[current_build_idx].lstrip()
string_to_add = string_to_add.lstrip("//!<>")
string_to_add = string_to_add.rstrip()
descrip_string += string_to_add
current_build_idx += 1
resulting_description = re.search(r"\[EXPORT]\s*:\s*\[\w*]\s(.*)", descrip_string)
if resulting_description:
info.value = resulting_description.group(1)
elif export_match.group(1).lower() == "skip":
info.type = MetaType.SKIP
return [info]
return []

View File

@@ -0,0 +1,507 @@
import logging
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels, MetaInfo, MetaType
from fsfwgen.utility.printer import PrettyPrinter
_LOGGER = logging.getLogger(__name__)
# Intermediate solution
MAX_STRING_LEN = 80
PRINT_TRUNCATED_ENTRIES = False
DEBUG_FOR_FILE_NAME = False
CLASS_ID_NAMESPACE = "CLASS_ID"
DEFAULT_MOVING_WINDOWS_SIZE = 7
INVALID_IF_ID = -1
@dataclass
class FileStartHelper:
start_name: str
start_name_or_value: str
count: int
cumulative_start_index: Optional[int] = None
@dataclass
class FileEndHelper:
end_name: str
cumulative_end_value: Optional[int] = None
@dataclass
class FileConnHelper:
file_name: str
sh: Optional[FileStartHelper]
eh: Optional[FileEndHelper]
class InterfaceParser(FileParser):
def __init__(self, file_list: list, print_table: bool = False):
super().__init__(file_list)
self.print_table = print_table
self.file_table_list = []
self.file_name_table = []
self.file_conn_helpers: Optional[List[FileConnHelper]] = None
self._debug_mode = False
def enable_debug_mode(self, enable: bool):
self._debug_mode = enable
def _handle_file_parsing_moving_window(
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
pass
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
self.file_name_table.append(file_name)
try:
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
self.__handle_regular_class_id_parsing(file_name=file_name, all_lines=all_lines)
def __handle_regular_class_id_parsing(self, file_name: str, all_lines: List[str]):
count = 0
current_file_table = dict()
start_matched = False
end_matched = False
start_name = ""
first_entry_name_or_index = ""
file_conn_entry = FileConnHelper(file_name, None, None)
for line in all_lines:
if not start_matched:
match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line)
if match:
# current_file_table.update({count: [match.group(1), match.group(2)]})
start_name = match.group(1)
first_entry_name_or_index = match.group(2)
start_matched = True
else:
match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line)
if match:
count += 1
# It is expected that the last entry is explicitely marked like this.
# TODO: Could also simply remember last entry and then designate that as end
# entry as soon as "}" is found. Requires moving window mode though
if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)):
last_entry_name = match.group(1)
end_matched = True
file_conn_entry.eh = FileEndHelper(last_entry_name, None)
break
else:
short_name = match.group(2)
if short_name == "":
short_name = match.group(1)[0:3]
current_file_table.update({count: [match.group(1), short_name]})
if not start_matched:
print("No start match detected when parsing interface files..")
print(f"Current file: {file_name} | Make sure to include a start definition")
sys.exit(1)
if not end_matched:
raise ValueError(
f"No end match detected when parsing interface file {file_name}. "
f"Make sure to use [EXPORT] : [END]"
)
file_conn_entry.sh = FileStartHelper(start_name, first_entry_name_or_index, count, None)
if self.file_conn_helpers is None:
self.file_conn_helpers = []
self.file_conn_helpers.append(file_conn_entry)
self.file_name_table.append(file_name)
self.file_table_list.append(current_file_table)
def _post_parsing_operation(self):
self.__assign_start_end_indexes()
self._print_start_end_info()
for idx, file_table in enumerate(self.file_table_list):
self.__build_mod_interface_table(
self.file_conn_helpers[idx].sh.cumulative_start_index, file_table
)
if self.print_table:
PrettyPrinter.pprint(self.mib_table)
def _print_start_end_info(self):
for conn_helper in self.file_conn_helpers:
print(
f"Detected {conn_helper.sh.count} entries in {conn_helper.file_name}, "
f"end index {conn_helper.eh.cumulative_end_value}"
)
def __assign_start_end_indexes(self):
conn_helpers_old = self.file_conn_helpers.copy()
all_indexes_filled = False
max_outer_iterations = 15
current_iteration = 0
while not all_indexes_filled:
for idx, conn_helper in enumerate(conn_helpers_old):
sh = conn_helper.sh
# In the very first file, the first index might/will be a number
if sh.start_name_or_value.isdigit():
sh.cumulative_start_index = int(sh.start_name_or_value)
conn_helpers_old[idx].eh.cumulative_end_value = (
sh.cumulative_start_index + sh.count
)
# Now, we try to connect the start and end of the files using the start and end
# names respectively
end_name_to_search = conn_helper.sh.start_name_or_value
for end_name_helper in conn_helpers_old:
eh = end_name_helper.eh
if eh.end_name == end_name_to_search and eh.cumulative_end_value is not None:
self.file_conn_helpers[
idx
].sh.cumulative_start_index = eh.cumulative_end_value
self.file_conn_helpers[idx].eh.cumulative_end_value = (
eh.cumulative_end_value + self.file_conn_helpers[idx].sh.count
)
all_indexes_filled = True
for idx, conn_helper in enumerate(conn_helpers_old):
if (
conn_helper.sh.cumulative_start_index is None
or conn_helper.eh.cumulative_end_value is None
):
all_indexes_filled = False
current_iteration += 1
if current_iteration >= max_outer_iterations:
print(
"Could not fill out start and end index list in "
"given number of maximum outer iterations!"
)
sys.exit(1)
def __build_mod_interface_table(self, count_start: int, interface_dict: dict):
dict_to_build = dict()
for local_count, interface_name_and_shortname in interface_dict.items():
dict_to_build.update(
{
interface_name_and_shortname[0]: [
local_count + count_start,
interface_name_and_shortname[1],
]
}
)
self.mib_table.update(dict_to_build)
class RetvalEntry:
def __init__(
self,
name: str,
description: str,
unique_id: int,
file_name: Path,
subsystem_name: str,
):
self.name = name
self.description = description
self.unique_id = unique_id
self.file_name = file_name
self.subsystem_name = subsystem_name
def __repr__(self):
return (
f"RetvalEntry(name={self.name!r}, description={self.description!r}, "
f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, "
f"subsystem_name={self.subsystem_name!r})"
)
RetvalDictT = Dict[int, RetvalEntry]
class ReturnValueParser(FileParser):
"""
Generic return value parser.
"""
def __init__(self, interfaces, file_list, print_tables):
super().__init__(file_list)
self.print_tables = print_tables
self.interfaces = interfaces
self.return_value_dict = dict()
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
self.obsw_root_path: Optional[Path] = None
self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""}
self.return_value_dict.update(
{
0: RetvalEntry(
name="OK",
description="System-wide code for ok.",
unique_id=0,
file_name=Path("fsfw/returnvalues/returnvalue.h"),
subsystem_name="HasReturnvaluesIF",
)
}
)
self.return_value_dict.update(
{
1: RetvalEntry(
name="Failed",
description="Unspecified system-wide code for failed.",
unique_id=1,
file_name=Path("fsfw/returnvalues/returnvalue.h"),
subsystem_name="HasReturnvaluesIF",
)
}
)
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
"""Former way to parse returnvalues. Not recommended anymore.
:param file_name:
:param args:
:param kwargs:
:return:
"""
if len(args) > 0:
print_truncated_entries = args[0]
else:
print_truncated_entries = False
all_lines = self._open_file(file_name=file_name)
for line in all_lines:
self.__handle_line_reading(line, file_name, print_truncated_entries)
def _handle_file_parsing_moving_window(
self,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
"""Parse for returnvalues using a moving window"""
interface_id_match = re.search(
rf"{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)",
moving_window[self._moving_window_center_idx],
)
if interface_id_match:
self.__handle_interfaceid_match(
interface_id_match=interface_id_match, file_name=file_name
)
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)?[\s]*ReturnValue_t[\s]*([\w]*)[\s]*=[\s]*((?!;).*$)",
moving_window[self._moving_window_center_idx],
re.DOTALL,
)
full_returnvalue_string = ""
if returnvalue_match:
if ";" in returnvalue_match.group(0):
full_returnvalue_string = returnvalue_match.group(0)
else:
full_returnvalue_string = self.__build_multi_line_returnvalue_string(
moving_window=moving_window,
first_line=moving_window[self._moving_window_center_idx],
)
number_match = INVALID_IF_ID
# Try to match for a string using the new API first. Example:
# static const ReturnValue_t PACKET_TOO_LONG =
# returnvalue::makeCode(CLASS_ID, 0);
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*"
r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
full_returnvalue_string,
)
if not returnvalue_match:
# Try to match for old API using MAE_RETURN_CODE macro
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)",
full_returnvalue_string,
)
if returnvalue_match:
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(2))
else:
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(3))
if returnvalue_match:
description = "No description"
meta_list = self.__search_for_descrip_string(moving_window=moving_window)
for meta in meta_list:
if meta.type == MetaType.DESC:
description = meta.value
elif meta.type == MetaType.SKIP:
return
if number_match == INVALID_IF_ID:
_LOGGER.warning(f"Invalid number match detected for file {file_name}")
_LOGGER.warning("Match groups:")
for group in returnvalue_match.groups():
_LOGGER.info(group)
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1),
file_name=file_name,
number_match=number_match,
description=description,
)
def __build_multi_line_returnvalue_string(
self, first_line: str, moving_window: List[str]
) -> str:
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
)
def __search_for_descrip_string(self, moving_window: List[str]) -> List[MetaInfo]:
return self._search_for_descrip_string_generic(
moving_window=moving_window,
break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t",
)
def __handle_line_reading(self, line, file_name, print_truncated_entries: bool):
newline = line
if self.last_lines[0] != "\n":
two_lines = self.last_lines[0] + " " + newline.strip()
else:
two_lines = ""
interface_id_match = re.search(
r"INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)", two_lines
)
if interface_id_match:
self.__handle_interfaceid_match(interface_id_match, file_name=file_name)
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)",
two_lines,
)
if returnvalue_match:
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1),
file_name=file_name,
description="",
number_match=get_number_from_dec_or_hex_str(returnvalue_match.group(2)),
)
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
def __handle_interfaceid_match(self, interface_id_match, file_name: Path) -> bool:
"""Handle a match of an interface ID definition in the code.
Returns whether the interface ID was found successfully in the IF ID header files
"""
if self.get_verbosity() == VerbosityLevels.DEBUG:
_LOGGER.info(f"Interface ID {interface_id_match.group(1)} found in {file_name}")
if_id_entry = self.interfaces.get(interface_id_match.group(1))
if if_id_entry is not None:
self.current_interface_id_entries["ID"] = if_id_entry[0]
else:
_LOGGER.warning(
f"Interface ID {interface_id_match.group(1)} not found in IF ID dictionary"
)
return False
self.current_interface_id_entries["Name"] = self.interfaces[interface_id_match.group(1)][
1
].lstrip()
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
if self.get_verbosity() == VerbosityLevels.DEBUG:
current_id = self.current_interface_id_entries["ID"]
_LOGGER.info(f"Current ID: {current_id}")
return True
def __handle_returnvalue_match(
self, name_match: str, number_match: int, file_name: Path, description: str
):
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"],
name_match,
MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES,
)
full_id = (self.current_interface_id_entries["ID"] << 8) | number_match
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = file_name.relative_to(self.obsw_root_path)
mib_entry = RetvalEntry(
name=string_to_add,
description=description,
unique_id=number_match,
file_name=file_name,
subsystem_name=self.current_interface_id_entries["FullName"],
)
self.return_value_dict.update({full_id: mib_entry})
self.count = self.count + 1
def _post_parsing_operation(self):
if self.print_tables:
PrettyPrinter.pprint(self.return_value_dict)
self.mib_table = self.return_value_dict
@staticmethod
def export_to_csv(filename: Path, list_of_entries: RetvalDictT, column_sep: str):
with open(filename, "w") as out:
out.write(
f"Full ID (hex){column_sep} Name{column_sep} Description{column_sep} "
f"Unique ID{column_sep} Subsytem Name{column_sep} File Path\n"
)
for k, entry in sorted(list_of_entries.items()):
# entry: tuple
if column_sep == ";":
entry.description = entry.description.replace(";", ",")
elif column_sep == ",":
# Quote the description
entry.description = f'"{entry.description}"'
out.write(
f"{k:#06x}{column_sep}{entry.name}{column_sep}{entry.description}"
f"{column_sep}{entry.unique_id}{column_sep}{entry.subsystem_name}"
f"{column_sep}{entry.file_name.as_posix()}\n"
)
def build_checked_string(
self,
first_part,
second_part,
max_string_len: int,
print_truncated_entries: bool,
):
"""Build a checked string"""
my_str = first_part + "_" + self.convert(second_part)
if len(my_str) > max_string_len:
if print_truncated_entries:
_LOGGER.warning(f"Entry {my_str} too long. Will truncate.")
my_str = my_str[0:max_string_len]
else:
# print("Entry: " + myStr + " is all right.")
pass
return my_str
@staticmethod
def convert(name):
single_strings = name.split("_")
new_string = ""
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(descr_string):
description = descr_string.lstrip("!<- ")
if description == "":
description = " "
return description
def get_number_from_dec_or_hex_str(a_string):
if a_string.startswith("0x"):
return int(a_string, 16)
if a_string.isdigit():
return int(a_string)
_LOGGER.warning(f"Illegal number representation: {a_string}")
return 0

View File

@@ -1,23 +1,16 @@
#! /usr/bin/python3
"""
@file
mib_packet_content_parser.py
@brief
CSV Writer
@details
This class writes tables to a csv.
@author
R. Mueller
@date
14.11.2019
"""
from modgen.utility.file_management import copy_file, move_file
from pathlib import Path
from fsfwgen.utility.file_management import copy_file, move_file
# TODO: Export to SQL
class CsvWriter:
def __init__(
self, filename: str, table_to_print=None, header_array=None, file_separator: str = ","
self,
filename: Path,
table_to_print=None,
header_array=None,
file_separator: str = ",",
):
if header_array is None:
header_array = []
@@ -35,7 +28,7 @@ class CsvWriter:
file.write("Index" + self.file_separator)
for index in range(self.column_numbers):
# noinspection PyTypeChecker
if index < len(self.header_array)-1:
if index < len(self.header_array) - 1:
file.write(self.header_array[index] + self.file_separator)
else:
file.write(self.header_array[index] + "\n")
@@ -49,11 +42,11 @@ class CsvWriter:
file.write(str(entry[columnIndex]) + "\n")
file.close()
def copy_csv(self, copy_destination: str = "."):
def copy_csv(self, copy_destination: Path = "."):
copy_file(self.filename, copy_destination)
print("CSV file was copied to " + copy_destination)
print(f"CSV file was copied to {copy_destination}")
def move_csv(self, move_destination: str):
def move_csv(self, move_destination: Path):
move_file(self.filename, move_destination)
if move_destination == ".." or move_destination == "../":
print("CSV Writer: CSV file was moved to parser root directory")

View File

@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
import logging
import shutil
import os
from pathlib import Path
_LOGGER = logging.getLogger(__name__)
def copy_file(filename: Path, destination: Path = "", delete_existing_file: bool = False):
if not os.path.exists(filename):
_LOGGER.warning(f"File {filename} does not exist")
return
if not os.path.isdir(destination) and os.path.exists(destination):
if delete_existing_file:
os.remove(destination)
else:
_LOGGER.warning(f"Destination file {destination} already exists")
return
try:
shutil.copy2(src=filename, dst=destination)
except FileNotFoundError:
_LOGGER.exception("File not found!")
except shutil.SameFileError:
_LOGGER.exception("Source and destination are the same!")
def move_file(file_name: Path, destination: Path = ""):
if not os.path.exists(file_name):
print(f"move_file: File {file_name} does not exist")
return
if not os.path.exists(destination):
print(f"move_file: Destination directory {destination} does not exist")
return
try:
shutil.copy2(file_name, destination)
os.remove(file_name)
return
except FileNotFoundError:
_LOGGER.exception("File not found!")
except shutil.SameFileError:
_LOGGER.exception("Source and destination are the same!")

View File

@@ -13,4 +13,3 @@ class Printer:
print(leading_string)
PrettyPrinter.pprint(dictionary)
print("\r\n", end="")

View File

@@ -1,5 +1,8 @@
import logging
import sqlite3
_LOGGER = logging.getLogger(__name__)
class SqlWriter:
def __init__(self, db_filename: str):
@@ -7,11 +10,11 @@ class SqlWriter:
self.conn = sqlite3.connect(self.filename)
def open(self, sql_creation_command: str):
print("SQL Writer: Opening " + self.filename)
_LOGGER.info(f"SQL Writer: Opening {self.filename}")
self.conn.execute(sql_creation_command)
def delete(self, sql_deletion_command):
print("SQL Writer: Deleting SQL table")
_LOGGER.info("SQL Writer: Deleting SQL table")
self.conn.execute(sql_deletion_command)
def write_entries(self, sql_insertion_command, current_entry):
@@ -20,13 +23,15 @@ class SqlWriter:
return cur.lastrowid
def commit(self):
print("SQL Writer: Commiting SQL table")
_LOGGER.info("SQL Writer: Commiting SQL table")
self.conn.commit()
def close(self):
self.conn.close()
def sql_writing_helper(self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str=""):
def sql_writing_helper(
self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str = ""
):
if deletion_cmd != "":
self.delete(deletion_cmd)
self.open(creation_cmd)

View File

@@ -1,76 +0,0 @@
import re
from modgen.parserbase.parser import FileParser
from modgen.utility.sql_writer import SqlWriter
class ObjectDefinitionParser(FileParser):
def __init__(self, file_list: list):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r", encoding="utf-8")
for line in file.readlines():
match = re.search(r'([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)', line)
if match:
self.mib_table.update({match.group(2): [match.group(1)]})
def _handle_file_parsing_moving_window(self, file_name: str, current_line: int, moving_window_size: int,
moving_window: list, *args, **kwargs):
pass
def _post_parsing_operation(self):
pass
def export_object_file(filename, object_list, file_separator: str = ","):
file = open(filename, "w")
for entry in object_list:
file.write(str(entry[0]) + file_separator + entry[1][0] + '\n')
file.close()
def write_translation_file(filename: str, list_of_entries, date_string_full: str):
outputfile = open(filename, "w")
print('ObjectParser: Writing translation file ' + filename)
definitions = ""
function = "const char* translateObject(object_id_t object) {\n\tswitch( (object & 0xFFFFFFFF) ) {\n"
for entry in list_of_entries:
# first part of translate file
definitions += f"const char *{entry[1][0]}_STRING = \"{entry[1][0]}\";\n"
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f"\tcase {entry[0]}:\n\t\treturn {entry[1][0]}_STRING;\n"
function += '\tdefault:\n\t\treturn "UNKNOWN_OBJECT";\n'
outputfile.write(f"/**\n * @brief\tAuto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n")
outputfile.write("#include \"translateObjects.h\"\n\n")
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
outputfile.close()
def write_translation_header_file(filename: str = "translateObjects.h"):
file = open(filename, "w")
file.write(
f"#ifndef FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n"
f"#define FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n\n"
f"#include <fsfw/objectmanager/SystemObjectIF.h>\n\n"
f"const char* translateObject(object_id_t object);\n\n"
f"#endif /* FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_ */\n"
)
def sql_object_exporter(
object_table: list, db_filename: str, delete_cmd: str, create_cmd: str, insert_cmd: str
):
sql_writer = SqlWriter(db_filename=db_filename)
sql_writer.delete(delete_cmd)
sql_writer.open(create_cmd)
for entry in object_table:
sql_writer.write_entries(
insert_cmd, (entry[0], entry[1][0])
)
sql_writer.commit()
sql_writer.close()

View File

@@ -1,71 +0,0 @@
"""
@file file_list_parser.py
@brief Generic File Parser class
@details
Used by parse header files. Implemented as class in case header parser becomes more complex
@author R. Mueller
@date 22.11.2019
"""
import os
import re
from typing import Union
# pylint: disable=too-few-public-methods
class FileListParser:
"""
Generic header parser which takes a directory name or directory name list
and parses all included header files recursively.
"""
def __init__(self, directory_list_or_name: Union[str, list]):
if isinstance(directory_list_or_name, str):
self.directory_list = [directory_list_or_name]
elif isinstance(directory_list_or_name, list):
self.directory_list = directory_list_or_name
else:
print("Header Parser: Passed directory list is not a header name or list of "
"header names")
self.header_files = []
def parse_header_files(self, search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False):
"""
This function is called to get a list of header files
:param search_recursively:
:param printout_string:
:param print_current_dir:
:return:
"""
print(printout_string, end="")
for directory in self.directory_list:
self.__get_header_file_list(directory, search_recursively, print_current_dir)
print(str(len(self.header_files)) + " header files were found.")
# g.PP.pprint(self.header_files)
return self.header_files
def __get_header_file_list(self, base_directory: str, seach_recursively: bool = False,
print_current_dir: bool = False):
if base_directory[-1] != '/':
base_directory += '/'
local_header_files = []
if print_current_dir:
print("Parsing header files in: " + base_directory)
base_list = os.listdir(base_directory)
# g.PP.pprint(base_list)
for entry in base_list:
header_file_match = re.match(r"[_.]*.*\.h", entry)
if header_file_match:
if os.path.isfile(base_directory + entry):
match_string = header_file_match.group(0)
if match_string[0] == '.' or match_string[0] == '_':
pass
else:
local_header_files.append(base_directory + entry)
if seach_recursively:
next_path = base_directory + entry
if os.path.isdir(next_path):
self.__get_header_file_list(next_path, seach_recursively)
# print("Files found in: " + base_directory)
# g.PP.pprint(local_header_files)
self.header_files.extend(local_header_files)

44
pyproject.toml Normal file
View File

@@ -0,0 +1,44 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"
[project]
name = "fsfwgen"
description = "FSFW Generator Core"
version = "0.3.4"
license = { text = "Apache-2.0" }
authors = [
{name = "Robin Mueller", email = "robin.mueller.m@gmail.com"}
]
classifiers = [
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
"Natural Language :: English",
"Operating System :: POSIX",
"Operating System :: Microsoft :: Windows",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Topic :: Communications",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Scientific/Engineering"
]
dependencies = [
"colorlog~=6.0"
]
[project.urls]
"Homepage" = "https://egit.irs.uni-stuttgart.de/fsfw/fsfwgen"
[tool.ruff]
exclude = [
".git",
"venv",
"docs"
]
line-length = 100

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
colorlog>=5.0.0

View File

@@ -1,373 +0,0 @@
import re
import sys
from typing import List, Tuple
from modgen.parserbase.parser import FileParser
from modgen.utility.printer import PrettyPrinter
# Intermediate solution
MAX_STRING_LEN = 80
PRINT_TRUNCATED_ENTRIES = False
DEBUG_INTERFACE_ID_COMPARISON = False
DEBUG_FOR_FILE_NAME = False
DEBUG_FILE_NAME = "RingBufferAnalyzer"
CLASS_ID_NAMESPACE = "CLASS_ID"
DEFAULT_MOVING_WINDOWS_SIZE = 7
class InterfaceParser(FileParser):
def __init__(self, file_list: list, print_table: bool = False):
super().__init__(file_list)
self.print_table = print_table
self.file_table_list = []
self.file_name_table = []
self.start_name_list = []
self.end_name_list = []
def _handle_file_parsing_moving_window(self, file_name: str, current_line: int,
moving_window_size: int, moving_window: list, *args,
**kwargs):
pass
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
self.file_name_table.append(file_name)
try:
file = open(file_name, 'r', encoding='utf-8')
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, 'r', encoding='cp1252')
all_lines = file.readlines()
self.__handle_regular_class_id_parsing(file_name=file_name, all_lines=all_lines)
def __handle_regular_class_id_parsing(self, file_name: str, all_lines: List[str]):
count = 0
current_file_table = dict()
start_matched = False
end_matched = False
start_name = ""
target_end_name = ""
for line in all_lines:
if not start_matched:
match = re.search(r'[\s]*([\w]*) = [\s]*([\w]*)', line)
if match:
# current_file_table.update({count: [match.group(1), match.group(2)]})
start_name = match.group(1)
target_end_name = match.group(2)
start_matched = True
else:
match = re.search(r'[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?', line)
if match:
count += 1
if re.search(r"\[EXPORT\][\s]*:[\s]*\[END\]", match.group(2)):
last_entry_name = match.group(1)
end_matched = True
self.end_name_list.append([last_entry_name, None])
else:
short_name = match.group(2)
if short_name == "":
short_name = match.group(1)[0:3]
current_file_table.update({count: [match.group(1), short_name]})
if not start_matched:
print("No start match detected when parsing interface files..")
print(f"Current file: {file_name} | Make sure to include a start definition")
sys.exit(1)
if not end_matched:
print("No end match detected when parsing interface files. Make sure to use [EXPORT] : [END]")
sys.exit(1)
self.start_name_list.append([start_name, target_end_name, None, count])
self.file_name_table.append(file_name)
self.file_table_list.append(current_file_table)
def _post_parsing_operation(self):
self.start_name_list, self.end_name_list = self.__assign_start_end_indexes(
self.start_name_list, self.end_name_list
)
for idx, file_table in enumerate(self.file_table_list):
self.__build_mod_interface_table(self.start_name_list[idx][2], file_table)
if self.print_table:
PrettyPrinter.pprint(self.mib_table)
@staticmethod
def __assign_start_end_indexes(start_name_list_list, end_name_list_list) -> Tuple[List, List]:
start_list_list_completed = start_name_list_list
end_list_list_completed = end_name_list_list
all_indexes_filled = False
max_outer_iterations = 15
current_iteration = 0
while not all_indexes_filled:
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[1].isdigit():
start_list_list_completed[idx][2] = int(start_name_list[1])
end_list_list_completed[idx][1] = \
start_list_list_completed[idx][2] + start_list_list_completed[idx][3]
target_end_name = start_name_list[1]
for end_name_list in end_list_list_completed:
end_name = end_name_list[0]
end_value = end_name_list[1]
if end_name == target_end_name and end_value is not None:
start_list_list_completed[idx][2] = end_value
end_list_list_completed[idx][1] = end_value + start_list_list_completed[idx][3]
all_indexes_filled = True
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[2] is None or end_name_list_list[idx][1] is None:
all_indexes_filled = False
current_iteration += 1
if current_iteration >= max_outer_iterations:
print("Could not fill out start and end index list in given number of maximum outer iterations!")
sys.exit(1)
return start_list_list_completed, end_list_list_completed
def __build_mod_interface_table(self, count_start: int, interface_dict: dict):
dict_to_build = dict()
for local_count, interface_name_and_shortname in interface_dict.items():
dict_to_build.update(
{interface_name_and_shortname[0]: [local_count + count_start, interface_name_and_shortname[1]]}
)
self.mib_table.update(dict_to_build)
class ReturnValueParser(FileParser):
"""
Generic return value parser.
"""
def __init__(self, interfaces, file_list, print_tables):
super().__init__(file_list)
self.print_tables = print_tables
self.interfaces = interfaces
self.return_value_dict = dict()
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3
self.current_interface_id_entries = {
"Name": "",
"ID": 0,
"FullName": ""
}
self.return_value_dict.update({0: ('OK', 'System-wide code for ok.', 'RETURN_OK',
'HasReturnvaluesIF.h', 'HasReturnvaluesIF')})
self.return_value_dict.update({1: ('Failed', 'Unspecified system-wide code for failed.',
'RETURN_FAILED', 'HasReturnvaluesIF.h',
'HasReturnvaluesIF')})
def set_moving_window_mode(self, moving_window_size: int):
"""
Set moving window parsing mode
:param moving_window_size:
:return:
"""
super().set_moving_window_mode(DEFAULT_MOVING_WINDOWS_SIZE)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
"""
Former way to parse returnvalues. Not recommended anymore.
:param file_name:
:param args:
:param kwargs:
:return:
"""
if len(args) > 0:
print_truncated_entries = args[0]
else:
print_truncated_entries = False
all_lines = self._open_file(file_name=file_name)
for line in all_lines:
self.__handle_line_reading(line, file_name, print_truncated_entries)
def _handle_file_parsing_moving_window(self, file_name: str, current_line: int,
moving_window_size: int, moving_window: list, *args,
**kwargs):
interface_id_match = re.search(
rf"{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)", moving_window[self.moving_window_center_idx]
)
if interface_id_match:
self.__handle_interfaceid_match(interface_id_match=interface_id_match, file_name=file_name)
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)?[\s]*ReturnValue_t[\s]*([\w]*)[\s]*=[\s]*((?!;).*$)",
moving_window[self.moving_window_center_idx], re.DOTALL
)
full_returnvalue_string = ""
if returnvalue_match:
if ";" in returnvalue_match.group(0):
full_returnvalue_string = returnvalue_match.group(0)
else:
full_returnvalue_string = self.__build_multi_line_returnvalue_string(
moving_window=moving_window, first_line=moving_window[self.moving_window_center_idx]
)
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w] *)[\s]*=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
full_returnvalue_string
)
if not returnvalue_match:
returnvalue_match = re.search(
r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*'
r'MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)',
full_returnvalue_string
)
if returnvalue_match:
description = self.__search_for_descrip_string(moving_window=moving_window)
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1), file_name=file_name, number_match=returnvalue_match.group(2),
description=description
)
pass
def __build_multi_line_returnvalue_string(
self, first_line: str, moving_window: List[str]
) -> str:
all_lines = first_line.rstrip()
end_found = False
current_idx = self.moving_window_center_idx
while not end_found and current_idx < len(moving_window) - 1:
current_idx += 1
string_to_add = moving_window[current_idx].lstrip()
if ";" in moving_window[current_idx]:
all_lines += string_to_add
break
else:
string_to_add.rstrip()
all_lines += string_to_add
return all_lines
def __search_for_descrip_string(self, moving_window: List[str]) -> str:
current_idx = self.moving_window_center_idx - 1
# Look at the line above first
descrip_match = re.search(
r"\[EXPORT\][\s]*:[\s]*\[COMMENT\]", moving_window[current_idx]
)
if not descrip_match:
while current_idx > 0:
current_idx -= 1
if re.search(r"^[\s]*static const(?:expr)? ReturnValue_t", moving_window[current_idx]):
break
descrip_match = re.search(
r"\[EXPORT\][\s]*:[\s]*\[COMMENT\]", moving_window[current_idx]
)
if descrip_match:
break
if descrip_match:
current_build_idx = current_idx
descrip_string = ""
while current_build_idx < self.moving_window_center_idx:
string_to_add = moving_window[current_build_idx].lstrip()
string_to_add = string_to_add.lstrip("//!<>")
string_to_add = string_to_add.rstrip()
descrip_string += string_to_add
current_build_idx += 1
else:
return ""
resulting_description = re.search(
r"\[EXPORT\][\s]*:[\s]*\[COMMENT\](.*)", descrip_string
)
return resulting_description.group(1)
def __handle_line_reading(self, line, file_name, print_truncated_entries: bool):
newline = line
if self.last_lines[0] != '\n':
two_lines = self.last_lines[0] + ' ' + newline.strip()
else:
two_lines = ''
interface_id_match = re.search(r'INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)',
two_lines)
if interface_id_match:
self.__handle_interfaceid_match(interface_id_match, file_name=file_name)
returnvalue_match = re.search(
r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*'
r'MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)',
two_lines
)
if returnvalue_match:
self.__handle_returnvalue_match(returnvalue_match, file_name, print_truncated_entries)
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
def __handle_interfaceid_match(self, interface_id_match, file_name: str):
if DEBUG_INTERFACE_ID_COMPARISON:
print(f"Interface ID {interface_id_match.group(1)} found in {file_name}")
self.current_interface_id_entries["ID"] = \
self.interfaces[interface_id_match.group(1)][0]
self.current_interface_id_entries["Name"] = \
self.interfaces[interface_id_match.group(1)][1]
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
if DEBUG_INTERFACE_ID_COMPARISON:
current_id = self.current_interface_id_entries["ID"]
print(f"Current ID: {current_id}")
def __handle_returnvalue_match(
self, name_match: str, number_match: str, file_name: str, description: str
):
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"], name_match, MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES
)
full_id = (self.current_interface_id_entries["ID"] << 8) + return_number_from_string(number_match)
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
dict_tuple = (
string_to_add, description, number_match, file_name, self.current_interface_id_entries["FullName"]
)
self.return_value_dict.update({
full_id: dict_tuple
})
self.count = self.count + 1
def _post_parsing_operation(self):
if self.print_tables:
PrettyPrinter.pprint(self.return_value_dict)
self.mib_table = self.return_value_dict
@staticmethod
def export_to_file(filename: str, list_of_entries: dict, file_separator: str):
file = open(filename, "w")
for entry in list_of_entries.items():
file.write(hex(entry[0]) + file_separator + entry[1][0] + file_separator + entry[1][1] +
file_separator + entry[1][2] + file_separator
+ entry[1][3] + file_separator + entry[1][4] + '\n')
file.close()
def build_checked_string(self, first_part, second_part, max_string_len: int,
print_truncated_entries: bool):
""" Build a checked string """
my_str = first_part + '_' + self.convert(second_part)
if len(my_str) > max_string_len:
if print_truncated_entries:
print("Warning: Entry " + my_str + " too long. Will truncate.")
my_str = my_str[0:max_string_len]
else:
# print("Entry: " + myStr + " is all right.")
pass
return my_str
@staticmethod
def convert(name):
single_strings = name.split('_')
new_string = ''
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(descr_string):
description = descr_string.lstrip('!<- ')
if description == '':
description = ' '
return description
def return_number_from_string(a_string):
if a_string.startswith('0x'):
return int(a_string, 16)
if a_string.isdigit():
return int(a_string)
print('Error: Illegal number representation: ' + a_string)
return 0

View File

View File

@@ -1,25 +0,0 @@
#! /usr/bin/python3.8
# -*- coding: utf-8 -*-
import shutil
import os
def copy_file(filename: str, destination: str = "", delete_existing_file: bool = False):
if os.path.exists(filename):
try:
shutil.copy2(filename, destination)
except FileNotFoundError as error:
print("copy_file: File not found!")
print(error)
except shutil.SameFileError:
print("copy_file: Source and destination are the same!")
def move_file(file_name: str, destination: str = ""):
if os.path.exists(file_name):
try:
shutil.copy2(file_name, destination)
os.remove(file_name)
except FileNotFoundError as error:
print("File not found!")
print(error)