10 Commits

Author SHA1 Message Date
b1e5a2d40a small tweak for updated fsfw 2022-08-24 17:26:49 +02:00
e84be4bb17 now it works again 2022-08-09 10:37:50 +02:00
fd9838bcba improved the file connector algorithm 2022-08-09 10:17:31 +02:00
9c412ace74 extended TODO 2022-07-20 11:04:53 +02:00
911aa0d89d added todo 2022-07-20 11:02:27 +02:00
a5dee6e417 improved csv format 2022-06-21 01:21:01 +02:00
a2e0c4f98e some more minor improvements 2022-06-21 00:57:01 +02:00
1b1ac86e8c more improvements and fixes for fsfwgen 2022-06-21 00:51:13 +02:00
36b44d1e26 new Path handling 2022-06-20 18:02:46 +02:00
fc191cc50e Refactor Path handling 2022-06-20 16:56:05 +02:00
10 changed files with 416 additions and 371 deletions

View File

@@ -1,73 +1,7 @@
import enum
import sys
import colorlog
import logging
import argparse
CONSOLE_LOGGER_NAME = "FSFW Generator Logger"
LOGGER_INSTANCE = None
class InfoFilter(logging.Filter):
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.INFO:
return rec.levelno
return None
class DebugFilter(logging.Filter):
"""Filter object, which is used so that only DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.DEBUG:
return rec.levelno
return None
def get_console_logger():
global LOGGER_INSTANCE
if LOGGER_INSTANCE is None:
LOGGER_INSTANCE = init_console_logger()
return LOGGER_INSTANCE
def init_console_logger():
logger = colorlog.getLogger(CONSOLE_LOGGER_NAME)
generic_format = colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s"
)
fault_format = colorlog.ColoredFormatter(
fmt="%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d "
"[%(filename)s:%(lineno)d] %(reset)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console_info_handler = colorlog.StreamHandler(stream=sys.stdout)
console_info_handler.setLevel(logging.INFO)
console_info_handler.addFilter(InfoFilter())
console_debug_handler = logging.StreamHandler(stream=sys.stdout)
console_debug_handler.setLevel(logging.DEBUG)
console_debug_handler.addFilter(DebugFilter())
console_info_handler.setFormatter(generic_format)
console_info_handler.addFilter(InfoFilter())
console_debug_handler.addFilter(DebugFilter())
console_error_handler = logging.StreamHandler(stream=sys.stderr)
console_error_handler.setLevel(logging.WARNING)
console_error_handler.setFormatter(fault_format)
logger.addHandler(console_info_handler)
logger.addHandler(console_debug_handler)
logger.addHandler(console_error_handler)
logger.setLevel(logging.DEBUG)
return logger
class ParserTypes(enum.Enum):
EVENTS = "events"
OBJECTS = "objects"
@@ -76,8 +10,6 @@ class ParserTypes(enum.Enum):
def init_printout(project_string: str):
global LOGGER_INSTANCE
LOGGER_INSTANCE = get_console_logger()
print(f"-- {project_string} MIB Generator --")

View File

@@ -1,16 +1,14 @@
import re
import os
from typing import List
import logging
from pathlib import Path
from typing import List, Optional, Dict
from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger
from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger()
EVENT_ENTRY_NAME_IDX = 0
EVENT_ENTRY_SEVERITY_IDX = 1
EVENT_ENTRY_INFO_IDX = 2
EVENT_SOURCE_FILE_IDX = 3
FSFW_EVENT_HEADER_INCLUDE = '#include "fsfw/events/Event.h"'
DEFAULT_MOVING_WINDOWS_SIZE = 7
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
@@ -49,35 +47,44 @@ class SubsystemDefinitionParser(FileParser):
pass
class EventEntry:
def __init__(self, name: str, severity: str, description: str, file_name: Path):
self.name = name
self.severity = severity
self.description = description
self.file_name = file_name
def __repr__(self):
return (
f"EventEntry(name={self.name!r}, severity={self.severity!r}, "
f"description={self.description!r}, file_name={self.file_name!r}"
)
EventDictT = Dict[int, EventEntry]
class EventParser(FileParser):
def __init__(self, file_list: List[str], interface_list):
def __init__(
self, file_list: List[Path], interface_list, moving_window_size: int = 7
):
super().__init__(file_list)
self.set_moving_window_mode(moving_window_size)
self.interfaces = interface_list
self.count = 0
self.my_id = 0
self.current_id = 0
self.obsw_root_path = None
self.mib_table: EventDictT = dict()
self.obsw_root_path: Optional[Path] = None
self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
try:
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
total_count = 0
for line in all_lines:
self.__handle_line_reading(line, file_name)
if self.count > 0:
print("File " + file_name + " contained " + str(self.count) + " events.")
total_count += self.count
self.count = 0
def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs):
logging.warning("Regular file parsing mode not implemented")
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -127,7 +134,7 @@ class EventParser(FileParser):
)
def __handle_event_match(
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
self, event_match, macro_api_match: bool, moving_window: list, file_name: Path
):
if ";" in event_match.group(0):
event_full_match = self.__generate_regex_event_match(
@@ -158,14 +165,16 @@ class EventParser(FileParser):
)
severity = event_full_match.group(4)
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
file_name = file_name.relative_to(self.obsw_root_path)
if self.mib_table.get(full_id) is not None:
LOGGER.warning(f"Duplicate event ID {full_id} detected")
LOGGER.info(
f"Name: {self.mib_table.get(full_id)[0]}| "
f"Description: {self.mib_table.get(full_id)[2]}"
f"Name: {self.mib_table.get(full_id).name}| "
f"Description: {self.mib_table.get(full_id).description}"
)
self.mib_table.update({full_id: (name, severity, description, file_name)})
self.mib_table.update(
{full_id: EventEntry(name, severity, description, file_name)}
)
self.count = self.count + 1
@staticmethod
@@ -194,56 +203,11 @@ class EventParser(FileParser):
def _post_parsing_operation(self):
pass
def __handle_line_reading(self, line, file_name: str):
if not self.last_lines[0] == "\n":
twolines = self.last_lines[0] + " " + line.strip()
else:
twolines = ""
match1 = re.search(
r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines
)
if match1:
self.current_id = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.my_id = self.return_number_from_string(self.current_id)
match = re.search(
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)",
twolines,
)
if match:
if match.group(1):
self.last_lines[0] = line
return
description = " "
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.my_id * 100) + self.return_number_from_string(
match.group(3)
)
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
# Replace backslashes with regular slashes
file_name.replace("\\", "/")
self.mib_table.update(
{full_id: (string_to_add, severity, description, file_name)}
)
self.count = self.count + 1
self.last_lines[0] = line
def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part)
if len(my_str) > 16:
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
my_str = my_str[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return my_str
@staticmethod
@@ -275,92 +239,76 @@ class EventParser(FileParser):
return description
def export_to_file(filename: str, event_list: list, file_separator: str):
file = open(filename, "w")
for entry in event_list:
event_id = int(entry[0])
event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}"
file.write(
str(event_id)
+ file_separator
+ event_id_as_hex
+ file_separator
+ event_value[EVENT_ENTRY_NAME_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_SEVERITY_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_INFO_IDX]
+ file_separator
+ event_value[EVENT_SOURCE_FILE_IDX]
+ "\n"
def export_to_csv(filename: Path, event_list: EventDictT, col_sep: str):
with open(filename, "w") as out:
fsep = col_sep
out.write(
f"Event ID (dec){col_sep} Event ID (hex){col_sep} Name{col_sep} "
f"Severity{col_sep} Description{col_sep} File Path\n"
)
file.close()
return
for entry in event_list.items():
event_id = int(entry[0])
event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}"
out.write(
f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value.name}{fsep}"
f"{event_value.severity}{fsep}{event_value.description}"
f"{fsep}{event_value.file_name.as_posix()}\n"
)
def write_translation_source_file(
event_list: list, date_string: str, filename: str = "translateEvents.cpp"
event_list: EventDictT, date_string: str, filename: Path = "translateEvents.cpp"
):
outputfile = open(filename, "w")
definitions = ""
# Look up table to avoid duplicate events
lut = dict()
function = (
"const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
)
for entry in event_list:
event_id = entry[0]
event_value = entry[1]
if event_value[EVENT_ENTRY_NAME_IDX] not in lut:
definitions += (
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING "
f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n'
)
function += (
f" case ({event_id}):\n "
f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
)
lut.update({event_value[EVENT_ENTRY_NAME_IDX] : event_value})
function += ' default:\n return "UNKNOWN_EVENT";\n'
outputfile.write(
f"/**\n * @brief Auto-generated event translation file. "
f"Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
outputfile.write('#include "translateEvents.h"\n\n')
outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n")
outputfile.close()
with open(filename, "w") as out:
definitions = ""
# Look up table to avoid duplicate events
lut = dict()
function = "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
for entry in event_list.items():
event_id = entry[0]
event_value = entry[1]
name = event_value.name
if name not in lut:
definitions += f"const char *{name}_STRING " f'= "{name}";\n'
function += f" case ({event_id}):\n " f"return {name}_STRING;\n"
lut.update({name: event_value})
function += ' default:\n return "UNKNOWN_EVENT";\n'
out.write(
f"/**\n * @brief Auto-generated event translation file. "
f"Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
out.write('#include "translateEvents.h"\n\n')
out.write(definitions + "\n" + function + " }\n return 0;\n}\n")
def write_translation_header_file(filename: str = "translateEvents.h"):
file = open(filename, "w")
file.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char *translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def write_translation_header_file(filename: Path = "translateEvents.h"):
with open(filename, "w") as out:
out.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char *translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def handle_csv_export(file_name: str, event_list: list, file_separator: str):
def handle_csv_export(file_name: Path, event_list: EventDictT, file_separator: str):
"""
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
export_to_file(
filename=file_name, event_list=event_list, file_separator=file_separator
)
export_to_csv(filename=file_name, event_list=event_list, col_sep=file_separator)
def handle_cpp_export(
event_list: list,
event_list: EventDictT,
date_string: str,
file_name: str = "translateEvents.cpp",
file_name: Path = "translateEvents.cpp",
generate_header: bool = True,
header_file_name: str = "translateEvents.h",
header_file_name: Path = "translateEvents.h",
):
write_translation_source_file(
event_list=event_list, date_string=date_string, filename=file_name

97
fsfwgen/logging.py Normal file
View File

@@ -0,0 +1,97 @@
import logging
import sys
from typing import Optional
from colorlog import ColoredFormatter
FSFWGEN_LOGGER_NAME = "fsfwgen"
__CONSOLE_LOGGER_SET_UP: Optional[logging.Logger] = None
def get_console_logger() -> logging.Logger:
global __CONSOLE_LOGGER_SET_UP
"""Get the global console logger instance. Error logs will still be saved to an error file
"""
logger = logging.getLogger(FSFWGEN_LOGGER_NAME)
if not __CONSOLE_LOGGER_SET_UP:
__CONSOLE_LOGGER_SET_UP = True
__setup_tmtc_console_logger()
return logger
def init_console_logger(log_level: int = logging.INFO) -> logging.Logger:
global __CONSOLE_LOGGER_SET_UP
if not __CONSOLE_LOGGER_SET_UP:
__CONSOLE_LOGGER_SET_UP = True
return __setup_tmtc_console_logger(log_level=log_level)
return get_console_logger()
def __setup_tmtc_console_logger(log_level: int = logging.INFO) -> logging.Logger:
"""Sets the LOGGER object which will be used globally. This needs to be called before
using the logger.
:return: Returns the instance of the global logger
"""
logger = logging.getLogger(FSFWGEN_LOGGER_NAME)
# Use colorlog for now because it allows more flexibility and custom messages
# for different levels
set_up_colorlog_logger(logger=logger)
logger.setLevel(level=log_level)
# set_up_coloredlogs_logger(logger=logger)
return logger
# Custom formatter. Allows different strings for info, error and debug output
class CustomTmtccmdFormatter(ColoredFormatter):
def __init__(
self, info_fmt: str, dbg_fmt: str, err_fmt: str, warn_fmt: str, datefmt=None
):
self.err_fmt = err_fmt
self.info_fmt = info_fmt
self.dbg_fmt = dbg_fmt
self.warn_fmt = warn_fmt
super().__init__(fmt="%(levelno)d: %(msg)s", datefmt=datefmt, style="%")
def format(self, record):
# Save the original format configured by the user
# when the logger formatter was instantiated
format_orig = self._style._fmt
# Replace the original format with one customized by logging level
if record.levelno == logging.DEBUG:
self._style._fmt = self.dbg_fmt
elif record.levelno == logging.INFO:
self._style._fmt = self.info_fmt
elif record.levelno == logging.ERROR:
self._style._fmt = self.err_fmt
elif record.levelno == logging.WARNING:
self._style._fmt = self.warn_fmt
# Call the original formatter class to do the grunt work
result = logging.Formatter.format(self, record)
# Restore the original format configured by the user
self._style._fmt = format_orig
return result
def set_up_colorlog_logger(logger: logging.Logger):
from colorlog import StreamHandler
dbg_fmt = "%(log_color)s%(levelname)-8s %(cyan)s [%(filename)s:%(lineno)d] %(reset)s%(message)s"
custom_formatter = CustomTmtccmdFormatter(
info_fmt="%(log_color)s%(levelname)-8s %(cyan)s %(reset)s%(message)s",
dbg_fmt=dbg_fmt,
err_fmt=dbg_fmt,
warn_fmt=dbg_fmt,
)
console_handler = StreamHandler(stream=sys.stdout)
console_handler.setFormatter(custom_formatter)
logger.addHandler(console_handler)
logger.propagate = False

View File

@@ -1,7 +1,9 @@
import re
from pathlib import Path
from typing import List
from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger
from fsfwgen.logging import get_console_logger
from fsfwgen.utility.sql_writer import SqlWriter
@@ -9,10 +11,10 @@ LOGGER = get_console_logger()
class ObjectDefinitionParser(FileParser):
def __init__(self, file_list: list):
def __init__(self, file_list: List[Path]):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
file = open(file_name, "r", encoding="utf-8")
for line in file.readlines():
match = re.search(r"([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)", line)
@@ -21,7 +23,7 @@ class ObjectDefinitionParser(FileParser):
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -42,28 +44,27 @@ def export_object_file(filename, object_list, file_separator: str = ","):
def write_translation_file(filename: str, list_of_entries, date_string_full: str):
outputfile = open(filename, "w")
LOGGER.info("ObjectParser: Writing translation file " + filename)
definitions = ""
function = (
"const char *translateObject(object_id_t object) "
"{\n switch ((object & 0xFFFFFFFF)) {\n"
)
for entry in list_of_entries:
# first part of translate file
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
outputfile.write(
f"/**\n * @brief Auto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n"
)
outputfile.write('#include "translateObjects.h"\n\n')
outputfile.write(definitions + "\n" + function + " return 0;\n}\n")
outputfile.close()
with open(filename, "w") as out:
LOGGER.info("ObjectParser: Writing translation file " + filename)
definitions = ""
function = (
"const char *translateObject(object_id_t object) "
"{\n switch ((object & 0xFFFFFFFF)) {\n"
)
for entry in list_of_entries:
# first part of translate file
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
out.write(
f"/**\n * @brief Auto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n"
)
out.write('#include "translateObjects.h"\n\n')
out.write(definitions + "\n" + function + " return 0;\n}\n")
def write_translation_header_file(filename: str = "translateObjects.h"):

View File

@@ -1,11 +1,11 @@
"""Generic File Parser class
Used by parse header files. Implemented as class in case header parser becomes more complex
"""
import os
import re
from typing import Union
from pathlib import Path
from typing import Union, List
from fsfwgen.core import get_console_logger
from fsfwgen.logging import get_console_logger
from logging import DEBUG
LOGGER = get_console_logger()
@@ -17,11 +17,11 @@ class FileListParser:
TODO: Filter functionality for each directory to filter out files or folders
"""
def __init__(self, directory_list_or_name: Union[str, list]):
def __init__(self, directory_list_or_name: Union[Path, List[Path]]):
self.directory_list = []
if isinstance(directory_list_or_name, str):
if isinstance(directory_list_or_name, Path):
self.directory_list.append(directory_list_or_name)
elif isinstance(directory_list_or_name, list):
elif isinstance(directory_list_or_name, List):
self.directory_list.extend(directory_list_or_name)
else:
LOGGER.warning(
@@ -34,7 +34,7 @@ class FileListParser:
search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False,
):
) -> List[Path]:
"""This function is called to get a list of header files
:param search_recursively:
:param printout_string:
@@ -52,30 +52,21 @@ class FileListParser:
def __get_header_file_list(
self,
base_directory: str,
base_directory: Path,
seach_recursively: bool = False,
print_current_dir: bool = False,
):
if base_directory[-1] != "/":
base_directory += "/"
local_header_files = []
if print_current_dir:
print("Parsing header files in: " + base_directory)
base_list = os.listdir(base_directory)
# g.PP.pprint(base_list)
for entry in base_list:
header_file_match = re.match(r"[_.]*.*\.h", entry)
if header_file_match:
if os.path.isfile(base_directory + entry):
match_string = header_file_match.group(0)
if match_string[0] == "." or match_string[0] == "_":
pass
else:
local_header_files.append(base_directory + entry)
print(f"Parsing header files in: {base_directory}")
for entry in base_directory.iterdir():
if (
entry.is_file()
and entry.suffix == ".h"
and entry.as_posix()[0] not in [".", "_"]
):
local_header_files.append(entry)
if seach_recursively:
next_path = base_directory + entry
if os.path.isdir(next_path):
self.__get_header_file_list(next_path, seach_recursively)
# print("Files found in: " + base_directory)
# g.PP.pprint(local_header_files)
if entry.is_dir():
self.__get_header_file_list(entry, seach_recursively)
self.header_files.extend(local_header_files)

View File

@@ -14,7 +14,9 @@ Child classes fill out the MIB table (self.mib_table)
import enum
import re
from abc import abstractmethod
from pathlib import Path
from typing import Dict, List
from enum import Enum, auto
class VerbosityLevels(enum.Enum):
@@ -23,9 +25,9 @@ class VerbosityLevels(enum.Enum):
DEBUG = 2
class FileParserModes(enum.Enum):
REGULAR = (enum.auto(),)
MOVING_WINDOW = enum.auto()
class FileParserModes(Enum):
REGULAR = 1
MOVING_WINDOW = 2
class FileParser:
@@ -39,7 +41,7 @@ class FileParser:
will be passed through to the abstract function implementations.
"""
def __init__(self, file_list):
def __init__(self, file_list: List[Path]):
if len(file_list) == 0:
print("File list is empty !")
self.file_list_empty = True
@@ -111,7 +113,7 @@ class FileParser:
return self.mib_table
@abstractmethod
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
"""
Implemented by child class. The developer should fill the info table (self.mib_table)
in this routine
@@ -125,7 +127,7 @@ class FileParser:
@abstractmethod
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -151,7 +153,7 @@ class FileParser:
:return:
"""
def __parse_file_with_moving_window(self, file_name: str, *args, **kwargs):
def __parse_file_with_moving_window(self, file_name: Path, *args, **kwargs):
all_lines = self._open_file(file_name=file_name)
moving_window_size = self.__parser_args
if moving_window_size == 0:
@@ -212,7 +214,7 @@ class FileParser:
pass
@staticmethod
def _open_file(file_name: str) -> list:
def _open_file(file_name: Path) -> list:
"""
Open a file, attempting common encodings utf-8 and cp1252
:param file_name:
@@ -222,7 +224,7 @@ class FileParser:
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
print("ReturnValueParser: Decoding error with file " + file_name)
print(f"Parser: Decoding error with file {file_name}")
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
return all_lines
@@ -261,9 +263,7 @@ class FileParser:
descrip_match = re.search(
r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx]
)
if descrip_match:
break
if current_idx <= 0:
if descrip_match or current_idx <= 0:
break
current_idx -= 1
if descrip_match:

View File

@@ -1,11 +1,12 @@
import os.path
import re
import sys
from typing import List, Tuple, Optional
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Union
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels
from fsfwgen.utility.printer import PrettyPrinter
from fsfwgen.core import get_console_logger
from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger()
@@ -20,14 +21,34 @@ DEFAULT_MOVING_WINDOWS_SIZE = 7
INVALID_IF_ID = -1
@dataclass
class FileStartHelper:
start_name: str
start_name_or_value: str
count: int
cumulative_start_index: Optional[int] = None
@dataclass
class FileEndHelper:
end_name: str
cumulative_end_value: Optional[int] = None
@dataclass
class FileConnHelper:
file_name: str
sh: Optional[FileStartHelper]
eh: Optional[FileEndHelper]
class InterfaceParser(FileParser):
def __init__(self, file_list: list, print_table: bool = False):
super().__init__(file_list)
self.print_table = print_table
self.file_table_list = []
self.file_name_table = []
self.start_name_list = []
self.end_name_list = []
self.file_conn_helpers: Optional[List[FileConnHelper]] = None
self._debug_mode = False
def enable_debug_mode(self, enable: bool):
@@ -60,23 +81,28 @@ class InterfaceParser(FileParser):
start_matched = False
end_matched = False
start_name = ""
target_end_name = ""
first_entry_name_or_index = ""
file_conn_entry = FileConnHelper(file_name, None, None)
for line in all_lines:
if not start_matched:
match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line)
if match:
# current_file_table.update({count: [match.group(1), match.group(2)]})
start_name = match.group(1)
target_end_name = match.group(2)
first_entry_name_or_index = match.group(2)
start_matched = True
else:
match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line)
if match:
count += 1
# It is expected that the last entry is explicitely marked like this.
# TODO: Could also simply remember last entry and then designate that as end
# entry as soon as "}" is found. Requires moving window mode though
if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)):
last_entry_name = match.group(1)
end_matched = True
self.end_name_list.append([last_entry_name, None])
file_conn_entry.eh = FileEndHelper(last_entry_name, None)
break
else:
short_name = match.group(2)
if short_name == "":
@@ -94,48 +120,68 @@ class InterfaceParser(FileParser):
"Make sure to use [EXPORT] : [END]"
)
sys.exit(1)
self.start_name_list.append([start_name, target_end_name, None, count])
file_conn_entry.sh = FileStartHelper(
start_name, first_entry_name_or_index, count, None
)
if self.file_conn_helpers is None:
self.file_conn_helpers = []
self.file_conn_helpers.append(file_conn_entry)
self.file_name_table.append(file_name)
self.file_table_list.append(current_file_table)
def _post_parsing_operation(self):
self.start_name_list, self.end_name_list = self.__assign_start_end_indexes(
self.start_name_list, self.end_name_list
)
self.__assign_start_end_indexes()
self._print_start_end_info()
for idx, file_table in enumerate(self.file_table_list):
self.__build_mod_interface_table(self.start_name_list[idx][2], file_table)
self.__build_mod_interface_table(
self.file_conn_helpers[idx].sh.cumulative_start_index, file_table
)
if self.print_table:
PrettyPrinter.pprint(self.mib_table)
@staticmethod
def __assign_start_end_indexes(
start_name_list_list, end_name_list_list
) -> Tuple[List, List]:
start_list_list_completed = start_name_list_list
end_list_list_completed = end_name_list_list
def _print_start_end_info(self):
for conn_helper in self.file_conn_helpers:
print(
f"Detected {conn_helper.sh.count} entries in {conn_helper.file_name}, "
f"end index {conn_helper.eh.cumulative_end_value}"
)
def __assign_start_end_indexes(self):
conn_helpers_old = self.file_conn_helpers.copy()
all_indexes_filled = False
max_outer_iterations = 15
current_iteration = 0
while not all_indexes_filled:
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[1].isdigit():
start_list_list_completed[idx][2] = int(start_name_list[1])
end_list_list_completed[idx][1] = (
start_list_list_completed[idx][2]
+ start_list_list_completed[idx][3]
for idx, conn_helper in enumerate(conn_helpers_old):
sh = conn_helper.sh
# In the very first file, the first index might/will be a number
if sh.start_name_or_value.isdigit():
sh.cumulative_start_index = int(sh.start_name_or_value)
conn_helpers_old[idx].eh.cumulative_end_value = (
sh.cumulative_start_index + sh.count
)
target_end_name = start_name_list[1]
for end_name_list in end_list_list_completed:
end_name = end_name_list[0]
end_value = end_name_list[1]
if end_name == target_end_name and end_value is not None:
start_list_list_completed[idx][2] = end_value
end_list_list_completed[idx][1] = (
end_value + start_list_list_completed[idx][3]
# Now, we try to connect the start and end of the files using the start and end
# names respectively
end_name_to_search = conn_helper.sh.start_name_or_value
for end_name_helper in conn_helpers_old:
eh = end_name_helper.eh
if (
eh.end_name == end_name_to_search
and eh.cumulative_end_value is not None
):
self.file_conn_helpers[
idx
].sh.cumulative_start_index = eh.cumulative_end_value
self.file_conn_helpers[idx].eh.cumulative_end_value = (
eh.cumulative_end_value
+ self.file_conn_helpers[idx].sh.count
)
all_indexes_filled = True
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[2] is None or end_name_list_list[idx][1] is None:
for idx, conn_helper in enumerate(conn_helpers_old):
if (
conn_helper.sh.cumulative_start_index is None
or conn_helper.eh.cumulative_end_value is None
):
all_indexes_filled = False
current_iteration += 1
if current_iteration >= max_outer_iterations:
@@ -144,7 +190,6 @@ class InterfaceParser(FileParser):
"given number of maximum outer iterations!"
)
sys.exit(1)
return start_list_list_completed, end_list_list_completed
def __build_mod_interface_table(self, count_start: int, interface_dict: dict):
dict_to_build = dict()
@@ -160,6 +205,32 @@ class InterfaceParser(FileParser):
self.mib_table.update(dict_to_build)
class RetvalEntry:
def __init__(
self,
name: str,
description: str,
unique_id: int,
file_name: Path,
subsystem_name: str,
):
self.name = name
self.description = description
self.unique_id = unique_id
self.file_name = file_name
self.subsystem_name = subsystem_name
def __repr__(self):
return (
f"RetvalEntry(name={self.name!r}, description={self.description!r}, "
f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, "
f"subsystem_name={self.subsystem_name!r})"
)
RetvalDictT = Dict[int, RetvalEntry]
class ReturnValueParser(FileParser):
"""
Generic return value parser.
@@ -173,32 +244,32 @@ class ReturnValueParser(FileParser):
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
self.obsw_root_path: Optional[str] = None
self.obsw_root_path: Optional[Path] = None
self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""}
self.return_value_dict.update(
{
0: (
"OK",
"System-wide code for ok.",
"RETURN_OK",
"HasReturnvaluesIF.h",
"HasReturnvaluesIF",
0: RetvalEntry(
name="OK",
description="System-wide code for ok.",
unique_id=0,
file_name=Path("fsfw/returnvalues/returnvalue.h"),
subsystem_name="HasReturnvaluesIF",
)
}
)
self.return_value_dict.update(
{
1: (
"Failed",
"Unspecified system-wide code for failed.",
"RETURN_FAILED",
"HasReturnvaluesIF.h",
"HasReturnvaluesIF",
1: RetvalEntry(
name="Failed",
description="Unspecified system-wide code for failed.",
unique_id=1,
file_name=Path("fsfw/returnvalues/returnvalue.h"),
subsystem_name="HasReturnvaluesIF",
)
}
)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
"""Former way to parse returnvalues. Not recommended anymore.
:param file_name:
:param args:
@@ -215,7 +286,7 @@ class ReturnValueParser(FileParser):
def _handle_file_parsing_moving_window(
self,
file_name: str,
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
@@ -249,7 +320,7 @@ class ReturnValueParser(FileParser):
number_match = INVALID_IF_ID
# Try to match for a string using the new API first. Example:
# static const ReturnValue_t PACKET_TOO_LONG =
# HasReturnvaluesIF::makeReturnCode(CLASS_ID, 0);
# returnvalue::makeCode(CLASS_ID, 0);
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*"
r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
@@ -263,9 +334,11 @@ class ReturnValueParser(FileParser):
full_returnvalue_string,
)
if returnvalue_match:
number_match = returnvalue_match.group(2)
number_match = get_number_from_dec_or_hex_str(
returnvalue_match.group(2)
)
else:
number_match = returnvalue_match.group(3)
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(3))
if returnvalue_match:
description = self.__search_for_descrip_string(moving_window=moving_window)
if number_match == INVALID_IF_ID:
@@ -315,12 +388,12 @@ class ReturnValueParser(FileParser):
name_match=returnvalue_match.group(1),
file_name=file_name,
description="",
number_match=returnvalue_match.group(2),
number_match=get_number_from_dec_or_hex_str(returnvalue_match.group(2)),
)
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
def __handle_interfaceid_match(self, interface_id_match, file_name: str) -> bool:
def __handle_interfaceid_match(self, interface_id_match, file_name: Path) -> bool:
"""Handle a match of an interface ID definition in the code.
Returns whether the interface ID was found successfully in the IF ID header files
"""
@@ -338,7 +411,7 @@ class ReturnValueParser(FileParser):
return False
self.current_interface_id_entries["Name"] = self.interfaces[
interface_id_match.group(1)
][1]
][1].lstrip()
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
if self.get_verbosity() == VerbosityLevels.DEBUG:
current_id = self.current_interface_id_entries["ID"]
@@ -346,7 +419,7 @@ class ReturnValueParser(FileParser):
return True
def __handle_returnvalue_match(
self, name_match: str, number_match: str, file_name: str, description: str
self, name_match: str, number_match: int, file_name: Path, description: str
):
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"],
@@ -354,23 +427,21 @@ class ReturnValueParser(FileParser):
MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES,
)
full_id = (
self.current_interface_id_entries["ID"] << 8
) + return_number_from_string(number_match)
full_id = (self.current_interface_id_entries["ID"] << 8) | number_match
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
dict_tuple = (
string_to_add,
description,
number_match,
file_name,
self.current_interface_id_entries["FullName"],
file_name = file_name.relative_to(self.obsw_root_path)
mib_entry = RetvalEntry(
name=string_to_add,
description=description,
unique_id=number_match,
file_name=file_name,
subsystem_name=self.current_interface_id_entries["FullName"],
)
self.return_value_dict.update({full_id: dict_tuple})
self.return_value_dict.update({full_id: mib_entry})
self.count = self.count + 1
def _post_parsing_operation(self):
@@ -379,24 +450,23 @@ class ReturnValueParser(FileParser):
self.mib_table = self.return_value_dict
@staticmethod
def export_to_file(filename: str, list_of_entries: dict, file_separator: str):
file = open(filename, "w")
for entry in list_of_entries.items():
file.write(
hex(entry[0])
+ file_separator
+ entry[1][0]
+ file_separator
+ entry[1][1]
+ file_separator
+ entry[1][2]
+ file_separator
+ entry[1][3]
+ file_separator
+ entry[1][4]
+ "\n"
def export_to_csv(filename: Path, list_of_entries: RetvalDictT, column_sep: str):
with open(filename, "w") as out:
out.write(
f"Full ID (hex){column_sep} Name{column_sep} Description{column_sep} "
f"Unique ID{column_sep} Subsytem Name{column_sep} File Path\n"
)
file.close()
for entry in list_of_entries.items():
if column_sep == ";":
entry[1].description = entry[1].description.replace(";", ",")
elif column_sep == ",":
# Quote the description
entry[1].description = f'"{entry[1].description}"'
out.write(
f"{entry[0]:#06x}{column_sep}{entry[1].name}{column_sep}{entry[1].description}"
f"{column_sep}{entry[1].unique_id}{column_sep}{entry[1].subsystem_name}"
f"{column_sep}{entry[1].file_name.as_posix()}\n"
)
def build_checked_string(
self,
@@ -434,7 +504,7 @@ class ReturnValueParser(FileParser):
return description
def return_number_from_string(a_string):
def get_number_from_dec_or_hex_str(a_string):
if a_string.startswith("0x"):
return int(a_string, 16)
if a_string.isdigit():

View File

@@ -1,3 +1,5 @@
from pathlib import Path
from fsfwgen.utility.file_management import copy_file, move_file
@@ -5,7 +7,7 @@ from fsfwgen.utility.file_management import copy_file, move_file
class CsvWriter:
def __init__(
self,
filename: str,
filename: Path,
table_to_print=None,
header_array=None,
file_separator: str = ",",
@@ -40,11 +42,11 @@ class CsvWriter:
file.write(str(entry[columnIndex]) + "\n")
file.close()
def copy_csv(self, copy_destination: str = "."):
def copy_csv(self, copy_destination: Path = "."):
copy_file(self.filename, copy_destination)
print("CSV file was copied to " + copy_destination)
print(f"CSV file was copied to {copy_destination}")
def move_csv(self, move_destination: str):
def move_csv(self, move_destination: Path):
move_file(self.filename, move_destination)
if move_destination == ".." or move_destination == "../":
print("CSV Writer: CSV file was moved to parser root directory")

View File

@@ -1,12 +1,16 @@
# -*- coding: utf-8 -*-
import shutil
import os
from fsfwgen.core import get_console_logger
from pathlib import Path
from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger()
def copy_file(filename: str, destination: str = "", delete_existing_file: bool = False):
def copy_file(
filename: Path, destination: Path = "", delete_existing_file: bool = False
):
if not os.path.exists(filename):
LOGGER.warning(f"File {filename} does not exist")
return
@@ -24,7 +28,7 @@ def copy_file(filename: str, destination: str = "", delete_existing_file: bool =
LOGGER.exception("Source and destination are the same!")
def move_file(file_name: str, destination: str = ""):
def move_file(file_name: Path, destination: Path = ""):
if not os.path.exists(file_name):
print(f"move_file: File {file_name} does not exist")
return

View File

@@ -1,6 +1,6 @@
import sqlite3
from fsfwgen.core import get_console_logger
from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger()