27 Commits

Author SHA1 Message Date
7eb6a6f7a8 changelog 2025-03-18 14:37:06 +01:00
bb7886f063 bump fsfwgen version 2025-03-18 14:36:22 +01:00
2d7a2a09b8 Merge pull request 'Added new compile time const event parsing' (#4) from baumgartl/events into main
Reviewed-on: #4
Reviewed-by: Robin Müller <muellerr@irs.uni-stuttgart.de>
2025-03-18 14:34:56 +01:00
76139f3934 added new compile time const event parsing 2025-03-14 13:22:10 +01:00
b9c8c9880f prep v0.3.4 2025-01-13 10:40:41 +01:00
7c14b2a2e0 prep v0.3.4 2025-01-13 10:38:07 +01:00
2403f3a019 prep v0.3.3 2025-01-13 10:33:51 +01:00
f52073d117 some minor updates for linting and formatting 2025-01-13 10:28:02 +01:00
2b31b91237 Merge pull request 'Event Parser Improvements' (#2) from meier/event-parser-improvements into main
Reviewed-on: #2
Reviewed-by: Robin Müller <muellerr@irs.uni-stuttgart.de>
2025-01-09 12:30:16 +01:00
Jakob Meier
497781555d - supports definition of event ids in hexadecimal format
- subsystem id can be extracted also when the subsystem id is not defined separately in the header file
2024-12-22 18:16:59 +01:00
bbe55592ec prep v0.3.2 2023-03-24 15:37:36 +01:00
fe6c68d97b add explicit handling for duplicate event names
when writing the translation file
2023-03-24 15:33:11 +01:00
cada155b8e prep v0.3.1 2023-03-14 15:24:55 +01:00
c40db0c10d bump version 2023-02-19 13:20:09 +01:00
1f1d7ab62a add support for for skipping retvals and events 2023-02-19 12:58:08 +01:00
98ecaba93a raise exception instead of exiting with useless error 2023-02-09 15:57:56 +01:00
66e31885a7 add changelog and removing logging module 2023-02-09 15:02:52 +01:00
b1e5a2d40a small tweak for updated fsfw 2022-08-24 17:26:49 +02:00
e84be4bb17 now it works again 2022-08-09 10:37:50 +02:00
fd9838bcba improved the file connector algorithm 2022-08-09 10:17:31 +02:00
9c412ace74 extended TODO 2022-07-20 11:04:53 +02:00
911aa0d89d added todo 2022-07-20 11:02:27 +02:00
a5dee6e417 improved csv format 2022-06-21 01:21:01 +02:00
a2e0c4f98e some more minor improvements 2022-06-21 00:57:01 +02:00
1b1ac86e8c more improvements and fixes for fsfwgen 2022-06-21 00:51:13 +02:00
36b44d1e26 new Path handling 2022-06-20 18:02:46 +02:00
fc191cc50e Refactor Path handling 2022-06-20 16:56:05 +02:00
15 changed files with 509 additions and 589 deletions

40
CHANGELOG.md Normal file
View File

@@ -0,0 +1,40 @@
Change Log
=======
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
# [unreleased]
# [v0.4.0]
- Update event handling for new `constexpr` templated arguments
# [v0.3.4]
- Hotfixes for pyproject.toml file
# [v0.3.3]
- Fixes for event parsing
- Removed `setup.cfg` and `setup.py`
# [v0.3.2]
- Added handling for duplicate event names when writing the event translation
file.
# [v0.3.1]
- Sorted returnvalue export by raw returnvalue.
# [v0.3.0]
- Description parser is now more generic and parses a list of meta informations.
- Added support for skip directive when parsing meta information.
# [v0.2.0]
- Remove `logging` module and move to more pythonic logging usage.

View File

@@ -1,5 +1,5 @@
VERSION_MAJOR = 0
VERSION_MINOR = 1
VERSION_REVISION = 0
__version__ = "0.3.2"
__version__ = "0.1.0"
VERSION_MAJOR = 0
VERSION_MINOR = 3
VERSION_REVISION = 2

View File

@@ -1,73 +1,7 @@
import enum
import sys
import colorlog
import logging
import argparse
CONSOLE_LOGGER_NAME = "FSFW Generator Logger"
LOGGER_INSTANCE = None
class InfoFilter(logging.Filter):
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.INFO:
return rec.levelno
return None
class DebugFilter(logging.Filter):
"""Filter object, which is used so that only DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.DEBUG:
return rec.levelno
return None
def get_console_logger():
global LOGGER_INSTANCE
if LOGGER_INSTANCE is None:
LOGGER_INSTANCE = init_console_logger()
return LOGGER_INSTANCE
def init_console_logger():
logger = colorlog.getLogger(CONSOLE_LOGGER_NAME)
generic_format = colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s"
)
fault_format = colorlog.ColoredFormatter(
fmt="%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d "
"[%(filename)s:%(lineno)d] %(reset)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console_info_handler = colorlog.StreamHandler(stream=sys.stdout)
console_info_handler.setLevel(logging.INFO)
console_info_handler.addFilter(InfoFilter())
console_debug_handler = logging.StreamHandler(stream=sys.stdout)
console_debug_handler.setLevel(logging.DEBUG)
console_debug_handler.addFilter(DebugFilter())
console_info_handler.setFormatter(generic_format)
console_info_handler.addFilter(InfoFilter())
console_debug_handler.addFilter(DebugFilter())
console_error_handler = logging.StreamHandler(stream=sys.stderr)
console_error_handler.setLevel(logging.WARNING)
console_error_handler.setFormatter(fault_format)
logger.addHandler(console_info_handler)
logger.addHandler(console_debug_handler)
logger.addHandler(console_error_handler)
logger.setLevel(logging.DEBUG)
return logger
class ParserTypes(enum.Enum):
EVENTS = "events"
OBJECTS = "objects"
@@ -76,8 +10,6 @@ class ParserTypes(enum.Enum):
def init_printout(project_string: str):
global LOGGER_INSTANCE
LOGGER_INSTANCE = get_console_logger()
print(f"-- {project_string} MIB Generator --")

View File

@@ -1,16 +1,13 @@
import re
import os
from typing import List
import logging
from pathlib import Path
from typing import List, Optional, Dict
from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger
from fsfwgen.parserbase.parser import FileParser, MetaType
_LOGGER = logging.getLogger(__name__)
LOGGER = get_console_logger()
EVENT_ENTRY_NAME_IDX = 0
EVENT_ENTRY_SEVERITY_IDX = 1
EVENT_ENTRY_INFO_IDX = 2
EVENT_SOURCE_FILE_IDX = 3
FSFW_EVENT_HEADER_INCLUDE = '#include "fsfw/events/Event.h"'
DEFAULT_MOVING_WINDOWS_SIZE = 7
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
@@ -49,35 +46,42 @@ class SubsystemDefinitionParser(FileParser):
pass
class EventEntry:
def __init__(self, name: str, severity: str, description: str, file_name: Path):
self.name = name
self.severity = severity
self.description = description
self.file_name = file_name
def __repr__(self):
return (
f"EventEntry(name={self.name!r}, severity={self.severity!r}, "
f"description={self.description!r}, file_name={self.file_name!r}"
)
EventDictT = Dict[int, EventEntry]
class EventParser(FileParser):
def __init__(self, file_list: List[str], interface_list):
def __init__(self, file_list: List[Path], interface_list, moving_window_size: int = 7):
super().__init__(file_list)
self.set_moving_window_mode(moving_window_size)
self.interfaces = interface_list
self.count = 0
self.my_id = 0
self.current_id = 0
self.obsw_root_path = None
self.mib_table: EventDictT = dict()
self.obsw_root_path: Optional[Path] = None
self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
try:
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
total_count = 0
for line in all_lines:
self.__handle_line_reading(line, file_name)
if self.count > 0:
print("File " + file_name + " contained " + str(self.count) + " events.")
total_count += self.count
self.count = 0
def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs):
logging.warning("Regular file parsing mode not implemented")
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -85,16 +89,14 @@ class EventParser(FileParser):
**kwargs,
):
subsystem_id_assignment_match = re.search(
rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);",
rf"{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*)",
moving_window[self.moving_window_center_idx],
)
if subsystem_id_assignment_match:
# For now, it is assumed that there is only going to be one subsystem ID per
# class / source file
try:
self.current_id = self.interfaces[
subsystem_id_assignment_match.group(2)
][0]
self.current_id = self.interfaces[subsystem_id_assignment_match.group(1)][0]
self.my_id = self.return_number_from_string(self.current_id)
except KeyError as e:
print(f"Key not found: {e}")
@@ -127,7 +129,7 @@ class EventParser(FileParser):
)
def __handle_event_match(
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
self, event_match, macro_api_match: bool, moving_window: list, file_name: Path
):
if ";" in event_match.group(0):
event_full_match = self.__generate_regex_event_match(
@@ -141,10 +143,16 @@ class EventParser(FileParser):
event_full_match = self.__generate_regex_event_match(
macro_api_match=macro_api_match, full_string=multi_line_string
)
description = self._search_for_descrip_string_generic(
description = "No description"
meta_list = self._search_for_descrip_string_generic(
moving_window=moving_window,
break_pattern=r"[\s]*static const(?:expr)?[\s]*Event[\s]*",
)
for meta in meta_list:
if meta.type == MetaType.SKIP:
return
elif meta.type == MetaType.DESC:
description = meta.value
if event_full_match:
name = event_match.group(EVENT_NAME_IDX)
if macro_api_match:
@@ -153,19 +161,21 @@ class EventParser(FileParser):
)
severity = event_full_match.group(3)
else:
if event_full_match.group(1) == "EV_REPLY_INVALID_SIZE":
print(f"Group 3: {event_full_match.group(3)}")
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(3)
)
severity = event_full_match.group(4)
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
file_name = file_name.relative_to(self.obsw_root_path)
if self.mib_table.get(full_id) is not None:
LOGGER.warning(f"Duplicate event ID {full_id} detected")
LOGGER.info(
f"Name: {self.mib_table.get(full_id)[0]}| "
f"Description: {self.mib_table.get(full_id)[2]}"
_LOGGER.warning(f"Duplicate event ID {full_id} detected")
_LOGGER.info(
f"Name: {self.mib_table.get(full_id).name}| "
f"Description: {self.mib_table.get(full_id).description}"
)
self.mib_table.update({full_id: (name, severity, description, file_name)})
self.mib_table.update({full_id: EventEntry(name, severity, description, file_name)})
self.count = self.count + 1
@staticmethod
@@ -174,19 +184,25 @@ class EventParser(FileParser):
# One line event definition.
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
)
else:
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;"
r"MAKE_EVENT\((0x[0-9a-fA-F]+|[0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
)
return re.search(regex_string, full_string)
# Non compiletime const version kept for backwards compatibility
regex_string = r"static const(?:expr)? Event\s*([\w]+)\s*=\s*event::makeEvent\(([\w:]+),\s*(0x[0-9a-fA-F]+|[0-9]{1,3})\s*,\s*severity::([A-Z]+)\)\s*;"
event_full_match = re.search(regex_string, full_string)
# Using old, non compiletime const version
if event_full_match:
return event_full_match
# Using compiletime const version
regex_string = r"static const(?:expr)? Event\s*([\w]+)\s*=\s*event::makeEvent<([\w:]+),\s*(0x[0-9a-fA-F]+|[0-9]{1,3})\s*,\s*severity::([A-Z]+)>\(\)\s*;"
event_full_match = re.search(regex_string, full_string)
return event_full_match
def __build_multi_line_event_string(
self, first_line: str, moving_window: List[str]
) -> str:
def __build_multi_line_event_string(self, first_line: str, moving_window: List[str]) -> str:
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
)
@@ -194,56 +210,11 @@ class EventParser(FileParser):
def _post_parsing_operation(self):
pass
def __handle_line_reading(self, line, file_name: str):
if not self.last_lines[0] == "\n":
twolines = self.last_lines[0] + " " + line.strip()
else:
twolines = ""
match1 = re.search(
r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines
)
if match1:
self.current_id = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.my_id = self.return_number_from_string(self.current_id)
match = re.search(
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)",
twolines,
)
if match:
if match.group(1):
self.last_lines[0] = line
return
description = " "
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.my_id * 100) + self.return_number_from_string(
match.group(3)
)
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
# Replace backslashes with regular slashes
file_name.replace("\\", "/")
self.mib_table.update(
{full_id: (string_to_add, severity, description, file_name)}
)
self.count = self.count + 1
self.last_lines[0] = line
def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part)
if len(my_str) > 16:
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
my_str = my_str[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return my_str
@staticmethod
@@ -275,92 +246,82 @@ class EventParser(FileParser):
return description
def export_to_file(filename: str, event_list: list, file_separator: str):
file = open(filename, "w")
for entry in event_list:
event_id = int(entry[0])
event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}"
file.write(
str(event_id)
+ file_separator
+ event_id_as_hex
+ file_separator
+ event_value[EVENT_ENTRY_NAME_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_SEVERITY_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_INFO_IDX]
+ file_separator
+ event_value[EVENT_SOURCE_FILE_IDX]
+ "\n"
def export_to_csv(filename: Path, event_list: EventDictT, col_sep: str):
with open(filename, "w") as out:
fsep = col_sep
out.write(
f"Event ID (dec){col_sep} Event ID (hex){col_sep} Name{col_sep} "
f"Severity{col_sep} Description{col_sep} File Path\n"
)
file.close()
return
for entry in event_list.items():
event_id = int(entry[0])
event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}"
out.write(
f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value.name}{fsep}"
f"{event_value.severity}{fsep}{event_value.description}"
f"{fsep}{event_value.file_name.as_posix()}\n"
)
def write_translation_source_file(
event_list: list, date_string: str, filename: str = "translateEvents.cpp"
event_list: EventDictT, date_string: str, filename: Path = "translateEvents.cpp"
):
outputfile = open(filename, "w")
definitions = ""
# Look up table to avoid duplicate events
lut = dict()
function = (
"const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
)
for entry in event_list:
event_id = entry[0]
event_value = entry[1]
if event_value[EVENT_ENTRY_NAME_IDX] not in lut:
definitions += (
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING "
f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n'
)
function += (
f" case ({event_id}):\n "
f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
)
lut.update({event_value[EVENT_ENTRY_NAME_IDX] : event_value})
function += ' default:\n return "UNKNOWN_EVENT";\n'
outputfile.write(
f"/**\n * @brief Auto-generated event translation file. "
f"Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
outputfile.write('#include "translateEvents.h"\n\n')
outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n")
outputfile.close()
with open(filename, "w") as out:
definitions = ""
# Look up table to avoid duplicate events
lut = dict()
function = "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
for entry in event_list.items():
event_id = entry[0]
event_value = entry[1]
name = event_value.name
if name in lut:
_LOGGER.warning(
"Duplicate name detected when generating event translation source file"
)
_LOGGER.warning(f"Name: {name}, Event Entry: {event_value}")
name = f"{name}_{event_id}"
_LOGGER.info(f"Created unique name {name}")
definitions += f"const char *{name}_STRING " f'= "{name}";\n'
function += f" case ({event_id}):\n " f"return {name}_STRING;\n"
lut.update({name: event_value})
function += ' default:\n return "UNKNOWN_EVENT";\n'
out.write(
f"/**\n * @brief Auto-generated event translation file. "
f"Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
out.write('#include "translateEvents.h"\n\n')
out.write(definitions + "\n" + function + " }\n return 0;\n}\n")
def write_translation_header_file(filename: str = "translateEvents.h"):
file = open(filename, "w")
file.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char *translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def write_translation_header_file(filename: Path = "translateEvents.h"):
with open(filename, "w") as out:
out.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char *translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def handle_csv_export(file_name: str, event_list: list, file_separator: str):
def handle_csv_export(file_name: Path, event_list: EventDictT, file_separator: str):
"""
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
export_to_file(
filename=file_name, event_list=event_list, file_separator=file_separator
)
export_to_csv(filename=file_name, event_list=event_list, col_sep=file_separator)
def handle_cpp_export(
event_list: list,
event_list: EventDictT,
date_string: str,
file_name: str = "translateEvents.cpp",
file_name: Path = "translateEvents.cpp",
generate_header: bool = True,
header_file_name: str = "translateEvents.h",
header_file_name: Path = "translateEvents.h",
):
write_translation_source_file(
event_list=event_list, date_string=date_string, filename=file_name

View File

@@ -1,18 +1,20 @@
import logging
import re
from pathlib import Path
from typing import List
from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger
from fsfwgen.utility.sql_writer import SqlWriter
LOGGER = get_console_logger()
_LOGGER = logging.getLogger(__name__)
class ObjectDefinitionParser(FileParser):
def __init__(self, file_list: list):
def __init__(self, file_list: List[Path]):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
file = open(file_name, "r", encoding="utf-8")
for line in file.readlines():
match = re.search(r"([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)", line)
@@ -21,7 +23,7 @@ class ObjectDefinitionParser(FileParser):
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -42,38 +44,37 @@ def export_object_file(filename, object_list, file_separator: str = ","):
def write_translation_file(filename: str, list_of_entries, date_string_full: str):
outputfile = open(filename, "w")
LOGGER.info("ObjectParser: Writing translation file " + filename)
definitions = ""
function = (
"const char *translateObject(object_id_t object) "
"{\n switch ((object & 0xFFFFFFFF)) {\n"
)
for entry in list_of_entries:
# first part of translate file
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
outputfile.write(
f"/**\n * @brief Auto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n"
)
outputfile.write('#include "translateObjects.h"\n\n')
outputfile.write(definitions + "\n" + function + " return 0;\n}\n")
outputfile.close()
with open(filename, "w") as out:
_LOGGER.info("ObjectParser: Writing translation file " + filename)
definitions = ""
function = (
"const char *translateObject(object_id_t object) "
"{\n switch ((object & 0xFFFFFFFF)) {\n"
)
for entry in list_of_entries:
# first part of translate file
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
out.write(
f"/**\n * @brief Auto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n"
)
out.write('#include "translateObjects.h"\n\n')
out.write(definitions + "\n" + function + " return 0;\n}\n")
def write_translation_header_file(filename: str = "translateObjects.h"):
file = open(filename, "w")
file.write(
f"#ifndef FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n"
f"#define FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n\n"
f"#include <fsfw/objectmanager/SystemObjectIF.h>\n\n"
f"const char *translateObject(object_id_t object);\n\n"
f"#endif /* FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_ */\n"
"#ifndef FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n"
"#define FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_\n\n"
"#include <fsfw/objectmanager/SystemObjectIF.h>\n\n"
"const char *translateObject(object_id_t object);\n\n"
"#endif /* FSFWCONFIG_OBJECTS_TRANSLATEOBJECTS_H_ */\n"
)

View File

@@ -1,13 +1,13 @@
"""Generic File Parser class
Used by parse header files. Implemented as class in case header parser becomes more complex
"""
import os
import re
from typing import Union
from fsfwgen.core import get_console_logger
import logging
from pathlib import Path
from typing import Union, List
LOGGER = get_console_logger()
_LOGGER = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
@@ -17,14 +17,14 @@ class FileListParser:
TODO: Filter functionality for each directory to filter out files or folders
"""
def __init__(self, directory_list_or_name: Union[str, list]):
def __init__(self, directory_list_or_name: Union[Path, List[Path]]):
self.directory_list = []
if isinstance(directory_list_or_name, str):
if isinstance(directory_list_or_name, Path):
self.directory_list.append(directory_list_or_name)
elif isinstance(directory_list_or_name, list):
elif isinstance(directory_list_or_name, List):
self.directory_list.extend(directory_list_or_name)
else:
LOGGER.warning(
_LOGGER.warning(
"Header Parser: Passed directory list is not a header name or list of header names"
)
self.header_files = []
@@ -34,7 +34,7 @@ class FileListParser:
search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False,
):
) -> List[Path]:
"""This function is called to get a list of header files
:param search_recursively:
:param printout_string:
@@ -43,39 +43,24 @@ class FileListParser:
"""
print(printout_string, end="")
for directory in self.directory_list:
self.__get_header_file_list(
directory, search_recursively, print_current_dir
)
self.__get_header_file_list(directory, search_recursively, print_current_dir)
print(str(len(self.header_files)) + " header files were found.")
# g.PP.pprint(self.header_files)
return self.header_files
def __get_header_file_list(
self,
base_directory: str,
base_directory: Path,
seach_recursively: bool = False,
print_current_dir: bool = False,
):
if base_directory[-1] != "/":
base_directory += "/"
local_header_files = []
if print_current_dir:
print("Parsing header files in: " + base_directory)
base_list = os.listdir(base_directory)
# g.PP.pprint(base_list)
for entry in base_list:
header_file_match = re.match(r"[_.]*.*\.h", entry)
if header_file_match:
if os.path.isfile(base_directory + entry):
match_string = header_file_match.group(0)
if match_string[0] == "." or match_string[0] == "_":
pass
else:
local_header_files.append(base_directory + entry)
print(f"Parsing header files in: {base_directory}")
for entry in base_directory.iterdir():
if entry.is_file() and entry.suffix == ".h" and entry.as_posix()[0] not in [".", "_"]:
local_header_files.append(entry)
if seach_recursively:
next_path = base_directory + entry
if os.path.isdir(next_path):
self.__get_header_file_list(next_path, seach_recursively)
# print("Files found in: " + base_directory)
# g.PP.pprint(local_header_files)
if entry.is_dir():
self.__get_header_file_list(entry, seach_recursively)
self.header_files.extend(local_header_files)

View File

@@ -11,10 +11,14 @@ Child classes fill out the MIB table (self.mib_table)
@author R. Mueller
@date 14.11.2019
"""
import dataclasses
import enum
import re
from abc import abstractmethod
from pathlib import Path
from typing import Dict, List
from enum import Enum
class VerbosityLevels(enum.Enum):
@@ -23,9 +27,20 @@ class VerbosityLevels(enum.Enum):
DEBUG = 2
class FileParserModes(enum.Enum):
REGULAR = (enum.auto(),)
MOVING_WINDOW = enum.auto()
class FileParserModes(Enum):
REGULAR = 1
MOVING_WINDOW = 2
class MetaType(enum.IntEnum):
SKIP = 1
DESC = 2
@dataclasses.dataclass
class MetaInfo:
type: MetaType
value: str = ""
class FileParser:
@@ -39,7 +54,7 @@ class FileParser:
will be passed through to the abstract function implementations.
"""
def __init__(self, file_list):
def __init__(self, file_list: List[Path]):
if len(file_list) == 0:
print("File list is empty !")
self.file_list_empty = True
@@ -94,7 +109,7 @@ class FileParser:
:return: Returns the mib table dictionary.
"""
if self.file_list_empty:
print(f"Nothing to parse, supplied file list is empty!")
print("Nothing to parse, supplied file list is empty!")
return self.mib_table
if self.__parser_mode == FileParserModes.REGULAR:
@@ -111,7 +126,7 @@ class FileParser:
return self.mib_table
@abstractmethod
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
"""
Implemented by child class. The developer should fill the info table (self.mib_table)
in this routine
@@ -125,7 +140,7 @@ class FileParser:
@abstractmethod
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -151,7 +166,7 @@ class FileParser:
:return:
"""
def __parse_file_with_moving_window(self, file_name: str, *args, **kwargs):
def __parse_file_with_moving_window(self, file_name: Path, *args, **kwargs):
all_lines = self._open_file(file_name=file_name)
moving_window_size = self.__parser_args
if moving_window_size == 0:
@@ -159,10 +174,7 @@ class FileParser:
return
moving_window = [""] * moving_window_size
for line_idx, line in enumerate(all_lines):
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print(f"Moving window pre line anaylsis line {line_idx}")
print(moving_window)
# The moving window will start with only the bottom being in the file
@@ -180,10 +192,7 @@ class FileParser:
for idx in range(moving_window_size - 1):
moving_window[idx] = moving_window[idx + 1]
moving_window[moving_window_size - 1] = line
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print(f"Moving window post line analysis line {line_idx}")
print(moving_window)
self._handle_file_parsing_moving_window(
@@ -192,27 +201,21 @@ class FileParser:
# Now the moving window moved past the end of the file. Sections which are outside
# the file are assigned an empty string until the window has moved out of file completely
for remaining_windows_idx in range(moving_window_size):
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
print(f"Moving window pre line analysis post EOF")
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print("Moving window pre line analysis post EOF")
print(moving_window)
num_entries_to_clear = remaining_windows_idx + 1
for idx_to_clear in range(num_entries_to_clear):
moving_window[moving_window_size - 1 - idx_to_clear] = ""
for idx_to_reassign in range(moving_window_size - 1 - num_entries_to_clear):
moving_window[idx_to_reassign] = moving_window[idx_to_reassign + 1]
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
print(f"Moving window post line anaylsis post EOF")
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
print("Moving window post line analysis post EOF")
print(moving_window)
pass
@staticmethod
def _open_file(file_name: str) -> list:
def _open_file(file_name: Path) -> list:
"""
Open a file, attempting common encodings utf-8 and cp1252
:param file_name:
@@ -222,14 +225,12 @@ class FileParser:
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
print("ReturnValueParser: Decoding error with file " + file_name)
print(f"Parser: Decoding error with file {file_name}")
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
return all_lines
def _build_multi_line_string_generic(
self, first_line: str, moving_window: List[str]
) -> str:
def _build_multi_line_string_generic(self, first_line: str, moving_window: List[str]) -> str:
"""This function transforms a multi line match into a one line match by searching for the
semicolon at the string end"""
all_lines = first_line.rstrip()
@@ -248,38 +249,34 @@ class FileParser:
def _search_for_descrip_string_generic(
self, moving_window: List[str], break_pattern: str
) -> str:
) -> List[MetaInfo]:
current_idx = self._moving_window_center_idx - 1
# Look at the line above first
descrip_match = re.search(
r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx]
)
if not descrip_match:
export_match = re.search(r"\[EXPORT]\s*:\s*\[(\w*)]", moving_window[current_idx])
if not export_match:
while True:
if re.search(break_pattern, moving_window[current_idx]):
break
descrip_match = re.search(
r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx]
)
if descrip_match:
break
if current_idx <= 0:
export_match = re.search(r"\[EXPORT]\s*:\s*\[(\w*)]", moving_window[current_idx])
if export_match or current_idx <= 0:
break
current_idx -= 1
if descrip_match:
current_build_idx = current_idx
descrip_string = ""
while current_build_idx < self._moving_window_center_idx:
string_to_add = moving_window[current_build_idx].lstrip()
string_to_add = string_to_add.lstrip("//!<>")
string_to_add = string_to_add.rstrip()
descrip_string += string_to_add
current_build_idx += 1
else:
return ""
resulting_description = re.search(
r"\[EXPORT][\s]*:[\s]*\[COMMENT][\s](.*)", descrip_string
)
if resulting_description:
return resulting_description.group(1)
return ""
info = MetaInfo(MetaType.DESC)
if export_match:
if export_match.group(1).lower() == "comment":
current_build_idx = current_idx
descrip_string = ""
while current_build_idx < self._moving_window_center_idx:
string_to_add = moving_window[current_build_idx].lstrip()
string_to_add = string_to_add.lstrip("//!<>")
string_to_add = string_to_add.rstrip()
descrip_string += string_to_add
current_build_idx += 1
resulting_description = re.search(r"\[EXPORT]\s*:\s*\[\w*]\s(.*)", descrip_string)
if resulting_description:
info.value = resulting_description.group(1)
elif export_match.group(1).lower() == "skip":
info.type = MetaType.SKIP
return [info]
return []

View File

@@ -1,13 +1,14 @@
import os.path
import logging
import re
import sys
from typing import List, Tuple, Optional
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Dict
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels, MetaInfo, MetaType
from fsfwgen.utility.printer import PrettyPrinter
from fsfwgen.core import get_console_logger
LOGGER = get_console_logger()
_LOGGER = logging.getLogger(__name__)
# Intermediate solution
MAX_STRING_LEN = 80
@@ -20,14 +21,34 @@ DEFAULT_MOVING_WINDOWS_SIZE = 7
INVALID_IF_ID = -1
@dataclass
class FileStartHelper:
start_name: str
start_name_or_value: str
count: int
cumulative_start_index: Optional[int] = None
@dataclass
class FileEndHelper:
end_name: str
cumulative_end_value: Optional[int] = None
@dataclass
class FileConnHelper:
file_name: str
sh: Optional[FileStartHelper]
eh: Optional[FileEndHelper]
class InterfaceParser(FileParser):
def __init__(self, file_list: list, print_table: bool = False):
super().__init__(file_list)
self.print_table = print_table
self.file_table_list = []
self.file_name_table = []
self.start_name_list = []
self.end_name_list = []
self.file_conn_helpers: Optional[List[FileConnHelper]] = None
self._debug_mode = False
def enable_debug_mode(self, enable: bool):
@@ -60,23 +81,28 @@ class InterfaceParser(FileParser):
start_matched = False
end_matched = False
start_name = ""
target_end_name = ""
first_entry_name_or_index = ""
file_conn_entry = FileConnHelper(file_name, None, None)
for line in all_lines:
if not start_matched:
match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line)
if match:
# current_file_table.update({count: [match.group(1), match.group(2)]})
start_name = match.group(1)
target_end_name = match.group(2)
first_entry_name_or_index = match.group(2)
start_matched = True
else:
match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line)
if match:
count += 1
# It is expected that the last entry is explicitely marked like this.
# TODO: Could also simply remember last entry and then designate that as end
# entry as soon as "}" is found. Requires moving window mode though
if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)):
last_entry_name = match.group(1)
end_matched = True
self.end_name_list.append([last_entry_name, None])
file_conn_entry.eh = FileEndHelper(last_entry_name, None)
break
else:
short_name = match.group(2)
if short_name == "":
@@ -84,58 +110,69 @@ class InterfaceParser(FileParser):
current_file_table.update({count: [match.group(1), short_name]})
if not start_matched:
print("No start match detected when parsing interface files..")
print(
f"Current file: {file_name} | Make sure to include a start definition"
)
print(f"Current file: {file_name} | Make sure to include a start definition")
sys.exit(1)
if not end_matched:
print(
"No end match detected when parsing interface files. "
"Make sure to use [EXPORT] : [END]"
raise ValueError(
f"No end match detected when parsing interface file {file_name}. "
f"Make sure to use [EXPORT] : [END]"
)
sys.exit(1)
self.start_name_list.append([start_name, target_end_name, None, count])
file_conn_entry.sh = FileStartHelper(start_name, first_entry_name_or_index, count, None)
if self.file_conn_helpers is None:
self.file_conn_helpers = []
self.file_conn_helpers.append(file_conn_entry)
self.file_name_table.append(file_name)
self.file_table_list.append(current_file_table)
def _post_parsing_operation(self):
self.start_name_list, self.end_name_list = self.__assign_start_end_indexes(
self.start_name_list, self.end_name_list
)
self.__assign_start_end_indexes()
self._print_start_end_info()
for idx, file_table in enumerate(self.file_table_list):
self.__build_mod_interface_table(self.start_name_list[idx][2], file_table)
self.__build_mod_interface_table(
self.file_conn_helpers[idx].sh.cumulative_start_index, file_table
)
if self.print_table:
PrettyPrinter.pprint(self.mib_table)
@staticmethod
def __assign_start_end_indexes(
start_name_list_list, end_name_list_list
) -> Tuple[List, List]:
start_list_list_completed = start_name_list_list
end_list_list_completed = end_name_list_list
def _print_start_end_info(self):
for conn_helper in self.file_conn_helpers:
print(
f"Detected {conn_helper.sh.count} entries in {conn_helper.file_name}, "
f"end index {conn_helper.eh.cumulative_end_value}"
)
def __assign_start_end_indexes(self):
conn_helpers_old = self.file_conn_helpers.copy()
all_indexes_filled = False
max_outer_iterations = 15
current_iteration = 0
while not all_indexes_filled:
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[1].isdigit():
start_list_list_completed[idx][2] = int(start_name_list[1])
end_list_list_completed[idx][1] = (
start_list_list_completed[idx][2]
+ start_list_list_completed[idx][3]
for idx, conn_helper in enumerate(conn_helpers_old):
sh = conn_helper.sh
# In the very first file, the first index might/will be a number
if sh.start_name_or_value.isdigit():
sh.cumulative_start_index = int(sh.start_name_or_value)
conn_helpers_old[idx].eh.cumulative_end_value = (
sh.cumulative_start_index + sh.count
)
target_end_name = start_name_list[1]
for end_name_list in end_list_list_completed:
end_name = end_name_list[0]
end_value = end_name_list[1]
if end_name == target_end_name and end_value is not None:
start_list_list_completed[idx][2] = end_value
end_list_list_completed[idx][1] = (
end_value + start_list_list_completed[idx][3]
# Now, we try to connect the start and end of the files using the start and end
# names respectively
end_name_to_search = conn_helper.sh.start_name_or_value
for end_name_helper in conn_helpers_old:
eh = end_name_helper.eh
if eh.end_name == end_name_to_search and eh.cumulative_end_value is not None:
self.file_conn_helpers[
idx
].sh.cumulative_start_index = eh.cumulative_end_value
self.file_conn_helpers[idx].eh.cumulative_end_value = (
eh.cumulative_end_value + self.file_conn_helpers[idx].sh.count
)
all_indexes_filled = True
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[2] is None or end_name_list_list[idx][1] is None:
for idx, conn_helper in enumerate(conn_helpers_old):
if (
conn_helper.sh.cumulative_start_index is None
or conn_helper.eh.cumulative_end_value is None
):
all_indexes_filled = False
current_iteration += 1
if current_iteration >= max_outer_iterations:
@@ -144,7 +181,6 @@ class InterfaceParser(FileParser):
"given number of maximum outer iterations!"
)
sys.exit(1)
return start_list_list_completed, end_list_list_completed
def __build_mod_interface_table(self, count_start: int, interface_dict: dict):
dict_to_build = dict()
@@ -160,6 +196,32 @@ class InterfaceParser(FileParser):
self.mib_table.update(dict_to_build)
class RetvalEntry:
def __init__(
self,
name: str,
description: str,
unique_id: int,
file_name: Path,
subsystem_name: str,
):
self.name = name
self.description = description
self.unique_id = unique_id
self.file_name = file_name
self.subsystem_name = subsystem_name
def __repr__(self):
return (
f"RetvalEntry(name={self.name!r}, description={self.description!r}, "
f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, "
f"subsystem_name={self.subsystem_name!r})"
)
RetvalDictT = Dict[int, RetvalEntry]
class ReturnValueParser(FileParser):
"""
Generic return value parser.
@@ -173,32 +235,32 @@ class ReturnValueParser(FileParser):
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
self.obsw_root_path: Optional[str] = None
self.obsw_root_path: Optional[Path] = None
self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""}
self.return_value_dict.update(
{
0: (
"OK",
"System-wide code for ok.",
"RETURN_OK",
"HasReturnvaluesIF.h",
"HasReturnvaluesIF",
0: RetvalEntry(
name="OK",
description="System-wide code for ok.",
unique_id=0,
file_name=Path("fsfw/returnvalues/returnvalue.h"),
subsystem_name="HasReturnvaluesIF",
)
}
)
self.return_value_dict.update(
{
1: (
"Failed",
"Unspecified system-wide code for failed.",
"RETURN_FAILED",
"HasReturnvaluesIF.h",
"HasReturnvaluesIF",
1: RetvalEntry(
name="Failed",
description="Unspecified system-wide code for failed.",
unique_id=1,
file_name=Path("fsfw/returnvalues/returnvalue.h"),
subsystem_name="HasReturnvaluesIF",
)
}
)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
"""Former way to parse returnvalues. Not recommended anymore.
:param file_name:
:param args:
@@ -215,7 +277,7 @@ class ReturnValueParser(FileParser):
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -249,7 +311,7 @@ class ReturnValueParser(FileParser):
number_match = INVALID_IF_ID
# Try to match for a string using the new API first. Example:
# static const ReturnValue_t PACKET_TOO_LONG =
# HasReturnvaluesIF::makeReturnCode(CLASS_ID, 0);
# returnvalue::makeCode(CLASS_ID, 0);
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*"
r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
@@ -263,16 +325,22 @@ class ReturnValueParser(FileParser):
full_returnvalue_string,
)
if returnvalue_match:
number_match = returnvalue_match.group(2)
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(2))
else:
number_match = returnvalue_match.group(3)
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(3))
if returnvalue_match:
description = self.__search_for_descrip_string(moving_window=moving_window)
description = "No description"
meta_list = self.__search_for_descrip_string(moving_window=moving_window)
for meta in meta_list:
if meta.type == MetaType.DESC:
description = meta.value
elif meta.type == MetaType.SKIP:
return
if number_match == INVALID_IF_ID:
LOGGER.warning(f"Invalid number match detected for file {file_name}")
LOGGER.warning(f"Match groups:")
_LOGGER.warning(f"Invalid number match detected for file {file_name}")
_LOGGER.warning("Match groups:")
for group in returnvalue_match.groups():
LOGGER.info(group)
_LOGGER.info(group)
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1),
file_name=file_name,
@@ -287,7 +355,7 @@ class ReturnValueParser(FileParser):
first_line=first_line, moving_window=moving_window
)
def __search_for_descrip_string(self, moving_window: List[str]) -> str:
def __search_for_descrip_string(self, moving_window: List[str]) -> List[MetaInfo]:
return self._search_for_descrip_string_generic(
moving_window=moving_window,
break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t",
@@ -315,38 +383,36 @@ class ReturnValueParser(FileParser):
name_match=returnvalue_match.group(1),
file_name=file_name,
description="",
number_match=returnvalue_match.group(2),
number_match=get_number_from_dec_or_hex_str(returnvalue_match.group(2)),
)
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
def __handle_interfaceid_match(self, interface_id_match, file_name: str) -> bool:
def __handle_interfaceid_match(self, interface_id_match, file_name: Path) -> bool:
"""Handle a match of an interface ID definition in the code.
Returns whether the interface ID was found successfully in the IF ID header files
"""
if self.get_verbosity() == VerbosityLevels.DEBUG:
LOGGER.info(
f"Interface ID {interface_id_match.group(1)} found in {file_name}"
)
_LOGGER.info(f"Interface ID {interface_id_match.group(1)} found in {file_name}")
if_id_entry = self.interfaces.get(interface_id_match.group(1))
if if_id_entry is not None:
self.current_interface_id_entries["ID"] = if_id_entry[0]
else:
LOGGER.warning(
_LOGGER.warning(
f"Interface ID {interface_id_match.group(1)} not found in IF ID dictionary"
)
return False
self.current_interface_id_entries["Name"] = self.interfaces[
interface_id_match.group(1)
][1]
self.current_interface_id_entries["Name"] = self.interfaces[interface_id_match.group(1)][
1
].lstrip()
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
if self.get_verbosity() == VerbosityLevels.DEBUG:
current_id = self.current_interface_id_entries["ID"]
LOGGER.info(f"Current ID: {current_id}")
_LOGGER.info(f"Current ID: {current_id}")
return True
def __handle_returnvalue_match(
self, name_match: str, number_match: str, file_name: str, description: str
self, name_match: str, number_match: int, file_name: Path, description: str
):
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"],
@@ -354,23 +420,21 @@ class ReturnValueParser(FileParser):
MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES,
)
full_id = (
self.current_interface_id_entries["ID"] << 8
) + return_number_from_string(number_match)
full_id = (self.current_interface_id_entries["ID"] << 8) | number_match
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
dict_tuple = (
string_to_add,
description,
number_match,
file_name,
self.current_interface_id_entries["FullName"],
file_name = file_name.relative_to(self.obsw_root_path)
mib_entry = RetvalEntry(
name=string_to_add,
description=description,
unique_id=number_match,
file_name=file_name,
subsystem_name=self.current_interface_id_entries["FullName"],
)
self.return_value_dict.update({full_id: dict_tuple})
self.return_value_dict.update({full_id: mib_entry})
self.count = self.count + 1
def _post_parsing_operation(self):
@@ -379,24 +443,24 @@ class ReturnValueParser(FileParser):
self.mib_table = self.return_value_dict
@staticmethod
def export_to_file(filename: str, list_of_entries: dict, file_separator: str):
file = open(filename, "w")
for entry in list_of_entries.items():
file.write(
hex(entry[0])
+ file_separator
+ entry[1][0]
+ file_separator
+ entry[1][1]
+ file_separator
+ entry[1][2]
+ file_separator
+ entry[1][3]
+ file_separator
+ entry[1][4]
+ "\n"
def export_to_csv(filename: Path, list_of_entries: RetvalDictT, column_sep: str):
with open(filename, "w") as out:
out.write(
f"Full ID (hex){column_sep} Name{column_sep} Description{column_sep} "
f"Unique ID{column_sep} Subsytem Name{column_sep} File Path\n"
)
file.close()
for k, entry in sorted(list_of_entries.items()):
# entry: tuple
if column_sep == ";":
entry.description = entry.description.replace(";", ",")
elif column_sep == ",":
# Quote the description
entry.description = f'"{entry.description}"'
out.write(
f"{k:#06x}{column_sep}{entry.name}{column_sep}{entry.description}"
f"{column_sep}{entry.unique_id}{column_sep}{entry.subsystem_name}"
f"{column_sep}{entry.file_name.as_posix()}\n"
)
def build_checked_string(
self,
@@ -409,7 +473,7 @@ class ReturnValueParser(FileParser):
my_str = first_part + "_" + self.convert(second_part)
if len(my_str) > max_string_len:
if print_truncated_entries:
LOGGER.warning(f"Entry {my_str} too long. Will truncate.")
_LOGGER.warning(f"Entry {my_str} too long. Will truncate.")
my_str = my_str[0:max_string_len]
else:
# print("Entry: " + myStr + " is all right.")
@@ -434,10 +498,10 @@ class ReturnValueParser(FileParser):
return description
def return_number_from_string(a_string):
def get_number_from_dec_or_hex_str(a_string):
if a_string.startswith("0x"):
return int(a_string, 16)
if a_string.isdigit():
return int(a_string)
LOGGER.warning(f"Illegal number representation: {a_string}")
_LOGGER.warning(f"Illegal number representation: {a_string}")
return 0

View File

@@ -1,3 +1,5 @@
from pathlib import Path
from fsfwgen.utility.file_management import copy_file, move_file
@@ -5,7 +7,7 @@ from fsfwgen.utility.file_management import copy_file, move_file
class CsvWriter:
def __init__(
self,
filename: str,
filename: Path,
table_to_print=None,
header_array=None,
file_separator: str = ",",
@@ -40,11 +42,11 @@ class CsvWriter:
file.write(str(entry[columnIndex]) + "\n")
file.close()
def copy_csv(self, copy_destination: str = "."):
def copy_csv(self, copy_destination: Path = "."):
copy_file(self.filename, copy_destination)
print("CSV file was copied to " + copy_destination)
print(f"CSV file was copied to {copy_destination}")
def move_csv(self, move_destination: str):
def move_csv(self, move_destination: Path):
move_file(self.filename, move_destination)
if move_destination == ".." or move_destination == "../":
print("CSV Writer: CSV file was moved to parser root directory")

View File

@@ -1,30 +1,32 @@
# -*- coding: utf-8 -*-
import logging
import shutil
import os
from fsfwgen.core import get_console_logger
LOGGER = get_console_logger()
from pathlib import Path
def copy_file(filename: str, destination: str = "", delete_existing_file: bool = False):
_LOGGER = logging.getLogger(__name__)
def copy_file(filename: Path, destination: Path = "", delete_existing_file: bool = False):
if not os.path.exists(filename):
LOGGER.warning(f"File {filename} does not exist")
_LOGGER.warning(f"File {filename} does not exist")
return
if not os.path.isdir(destination) and os.path.exists(destination):
if delete_existing_file:
os.remove(destination)
else:
LOGGER.warning(f"Destination file {destination} already exists")
_LOGGER.warning(f"Destination file {destination} already exists")
return
try:
shutil.copy2(src=filename, dst=destination)
except FileNotFoundError:
LOGGER.exception("File not found!")
_LOGGER.exception("File not found!")
except shutil.SameFileError:
LOGGER.exception("Source and destination are the same!")
_LOGGER.exception("Source and destination are the same!")
def move_file(file_name: str, destination: str = ""):
def move_file(file_name: Path, destination: Path = ""):
if not os.path.exists(file_name):
print(f"move_file: File {file_name} does not exist")
return
@@ -36,6 +38,6 @@ def move_file(file_name: str, destination: str = ""):
os.remove(file_name)
return
except FileNotFoundError:
LOGGER.exception("File not found!")
_LOGGER.exception("File not found!")
except shutil.SameFileError:
LOGGER.exception("Source and destination are the same!")
_LOGGER.exception("Source and destination are the same!")

View File

@@ -1,8 +1,7 @@
import logging
import sqlite3
from fsfwgen.core import get_console_logger
LOGGER = get_console_logger()
_LOGGER = logging.getLogger(__name__)
class SqlWriter:
@@ -11,11 +10,11 @@ class SqlWriter:
self.conn = sqlite3.connect(self.filename)
def open(self, sql_creation_command: str):
LOGGER.info(f"SQL Writer: Opening {self.filename}")
_LOGGER.info(f"SQL Writer: Opening {self.filename}")
self.conn.execute(sql_creation_command)
def delete(self, sql_deletion_command):
LOGGER.info("SQL Writer: Deleting SQL table")
_LOGGER.info("SQL Writer: Deleting SQL table")
self.conn.execute(sql_deletion_command)
def write_entries(self, sql_insertion_command, current_entry):
@@ -24,7 +23,7 @@ class SqlWriter:
return cur.lastrowid
def commit(self):
LOGGER.info("SQL Writer: Commiting SQL table")
_LOGGER.info("SQL Writer: Commiting SQL table")
self.conn.commit()
def close(self):

39
lint.py
View File

@@ -1,39 +0,0 @@
#!/usr/bin/env python3
import os
import sys
def main():
exclude_dirs_flag = ""
if not os.path.exists("setup.cfg"):
exclude_dirs_flag = (
"--exclude .git,__pycache__,docs/conf.py,old,build,dist,venv"
)
additional_flags_both_steps = "--count --statistics"
additional_flags_first_step = "--select=E9,F63,F7,F82 --show-source"
flake8_first_step_cmd = (
f"flake8 . {additional_flags_both_steps} "
f"{additional_flags_first_step} {exclude_dirs_flag}"
)
status = os.system(flake8_first_step_cmd)
if os.name == "nt":
if status != 0:
print(f"Flake8 linter errors with status {status}")
else:
if os.WEXITSTATUS(status) != 0:
print(f"Flake8 linter errors with status {status}")
sys.exit(0)
additional_flags_second_step = (
'--exit-zero --max-complexity=10 --per-file-ignores="__init__.py:F401"'
)
if not os.path.exists("setup.cfg"):
additional_flags_second_step += " --max-line-length=100"
flake8_second_step_cmd = (
f"flake8 . {additional_flags_both_steps} {additional_flags_second_step}"
f" {exclude_dirs_flag}"
)
os.system(flake8_second_step_cmd)
if __name__ == "__main__":
main()

View File

@@ -3,4 +3,42 @@ requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"
[project]
name = "fsfwgen"
description = "FSFW Generator Core"
version = "0.4.0"
license = { text = "Apache-2.0" }
authors = [
{name = "Robin Mueller", email = "robin.mueller.m@gmail.com"}
]
classifiers = [
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
"Natural Language :: English",
"Operating System :: POSIX",
"Operating System :: Microsoft :: Windows",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Topic :: Communications",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Scientific/Engineering"
]
dependencies = [
"colorlog~=6.0"
]
[project.urls]
"Homepage" = "https://egit.irs.uni-stuttgart.de/fsfw/fsfwgen"
[tool.ruff]
exclude = [
".git",
"venv",
"docs"
]
line-length = 100

View File

@@ -1,50 +0,0 @@
[metadata]
name = fsfwgen
description = FSFW Generator Core
version = attr: fsfwgen.__version__
long_description = file: README.md, NOTICE
long_description_content_type = text/markdown
license = Apache-2.0
author = Robin Mueller
author_email = muellerr@irs.uni-stuttgart.de
platform = any
url = https://egit.irs.uni-stuttgart.de/fsfw/fsfwgen
classifiers =
Development Status :: 5 - Production/Stable
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Natural Language :: English
Operating System :: POSIX
Operating System :: Microsoft :: Windows
Programming Language :: Python :: 3
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: Communications
Topic :: Software Development :: Libraries
Topic :: Software Development :: Libraries :: Python Modules
Topic :: Scientific/Engineering
[options]
install_requires =
colorlog>=6.0.0
package_dir =
= .
packages = find:
python_requires = >=3.8
[flake8]
max-line-length = 100
ignore = D203, W503
exclude =
.git,
__pycache__,
docs/conf.py,
old,
build,
dist,
venv
max-complexity = 10
extend-ignore =
# See https://github.com/PyCQA/pycodestyle/issues/373
E203,

View File

@@ -1,12 +0,0 @@
#!/usr/bin/python3
"""
We do the package handling in the static setup.cfg but include an empty setup.py
to allow editable installs https://packaging.python.org/tutorials/packaging-projects/
and provide extensibility
"""
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
setup()