improved logger and applied black formatter

This commit is contained in:
Robin Müller 2022-02-26 13:16:09 +01:00
parent 24fa9a3fe3
commit 1be773a20f
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
10 changed files with 396 additions and 213 deletions

62
core.py
View File

@ -1,14 +1,33 @@
import enum
import sys
import colorlog
import logging
import argparse
CONSOLE_LOGGER_NAME = 'FSFW Generator Logger'
CONSOLE_LOGGER_NAME = "FSFW Generator Logger"
LOGGER_INSTANCE = None
class InfoFilter(logging.Filter):
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.INFO:
return rec.levelno
return None
class DebugFilter(logging.Filter):
"""Filter object, which is used so that only DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.DEBUG:
return rec.levelno
return None
def get_console_logger():
global LOGGER_INSTANCE
if LOGGER_INSTANCE is None:
@ -17,35 +36,48 @@ def get_console_logger():
def init_console_logger():
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)-8s | %(reset)s%(message)s'
))
logger = colorlog.getLogger(CONSOLE_LOGGER_NAME)
logger.addHandler(handler)
generic_format = colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s"
)
console_info_handler = colorlog.StreamHandler(stream=sys.stdout)
console_info_handler.setLevel(logging.INFO)
console_info_handler.addFilter(InfoFilter())
console_debug_handler = logging.StreamHandler(stream=sys.stdout)
console_debug_handler.setLevel(logging.DEBUG)
console_debug_handler.addFilter(DebugFilter())
console_info_handler.setFormatter(generic_format)
console_info_handler.addFilter(InfoFilter())
console_debug_handler.addFilter(DebugFilter())
logger.addHandler(console_info_handler)
logger.addHandler(console_debug_handler)
logger.setLevel(logging.DEBUG)
return logger
class ParserTypes(enum.Enum):
EVENTS = 'events',
OBJECTS = 'objects',
RETVALS = 'returnvalues',
SUBSERVICES = 'subservices'
EVENTS = ("events",)
OBJECTS = ("objects",)
RETVALS = ("returnvalues",)
SUBSERVICES = "subservices"
def init_printout(project_string: str):
global LOGGER_INSTANCE
LOGGER_INSTANCE = get_console_logger()
print(f'-- {project_string} MOD Generator --')
print(f"-- {project_string} MOD Generator --")
def return_generic_args_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser('Arguments for FSFW MOD generation')
parser = argparse.ArgumentParser("Arguments for FSFW MOD generation")
choices = ("events", "objects", "returnvalues", "subservices")
parser.add_argument(
'type', metavar='type', choices=choices,
help=f'Type of MOD data to generate. Choices: {choices}'
"type",
metavar="type",
choices=choices,
help=f"Type of MOD data to generate. Choices: {choices}",
)
return parser

View File

@ -25,14 +25,22 @@ class SubsystemDefinitionParser(FileParser):
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r")
for line in file.readlines():
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', line)
match = re.search(r"([A-Z0-9_]*) = ([0-9]{1,3})", line)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs):
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', moving_window[self.moving_window_center_idx])
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
match = re.search(
r"([A-Z0-9_]*) = ([0-9]{1,3})", moving_window[self.moving_window_center_idx]
)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
@ -52,10 +60,10 @@ class EventParser(FileParser):
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
try:
file = open(file_name, 'r', encoding='utf-8')
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, 'r', encoding='cp1252')
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
total_count = 0
for line in all_lines:
@ -66,17 +74,25 @@ class EventParser(FileParser):
self.count = 0
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs):
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
subsystem_id_assignment_match = re.search(
rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);",
moving_window[self.moving_window_center_idx]
moving_window[self.moving_window_center_idx],
)
if subsystem_id_assignment_match:
# For now, it is assumed that there is only going to be one subsystem ID per
# class / source file
try:
self.current_id = self.interfaces[subsystem_id_assignment_match.group(2)][0]
self.current_id = self.interfaces[
subsystem_id_assignment_match.group(2)
][0]
self.my_id = self.return_number_from_string(self.current_id)
except KeyError as e:
print(f"Key not found: {e}")
@ -84,30 +100,33 @@ class EventParser(FileParser):
# Now try to look for event definitions. Moving windows allows multi line event definitions
# These two variants need to be checked
event_match = re.match(
r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*event::makeEvent[^\n]*",
moving_window[self.moving_window_center_idx]
r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=",
moving_window[self.moving_window_center_idx],
)
macro_api_match = False
if not event_match:
event_match = re.match(
r"[\s]*static[\s]*const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*MAKE_EVENT[^\n]*",
moving_window[self.moving_window_center_idx]
)
if event_match:
for idx in range(0, 3):
if "MAKE_EVENT" in moving_window[self.moving_window_center_idx + idx]:
macro_api_match = True
break
elif "makeEvent" in moving_window[self.moving_window_center_idx + idx]:
break
else:
event_match = False
if event_match:
self.__handle_event_match(
event_match=event_match, macro_api_match=macro_api_match,
moving_window=moving_window, file_name=file_name
event_match=event_match,
macro_api_match=macro_api_match,
moving_window=moving_window,
file_name=file_name,
)
def __handle_event_match(
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
):
if ";" in event_match.group(0):
event_full_match = self.__generate_regex_event_match(
macro_api_match=macro_api_match,
full_string=moving_window[self.moving_window_center_idx]
full_string=moving_window[self.moving_window_center_idx],
)
else:
multi_line_string = self.__build_multi_line_event_string(
@ -117,17 +136,20 @@ class EventParser(FileParser):
macro_api_match=macro_api_match, full_string=multi_line_string
)
description = self._search_for_descrip_string_generic(
moving_window=moving_window, break_pattern=r'[\s]*static const(?:expr)?[\s]*Event[\s]*'
moving_window=moving_window,
break_pattern=r"[\s]*static const(?:expr)?[\s]*Event[\s]*",
)
if event_full_match:
name = event_match.group(EVENT_NAME_IDX)
if macro_api_match:
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(2))
event_full_match.group(2)
)
severity = event_full_match.group(3)
else:
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(3))
event_full_match.group(3)
)
severity = event_full_match.group(4)
self.mib_table.update({full_id: (name, severity, description, file_name)})
self.count = self.count + 1
@ -136,18 +158,20 @@ class EventParser(FileParser):
def __generate_regex_event_match(macro_api_match: bool, full_string: str):
if macro_api_match:
# One line event definition.
regex_string = \
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
)
else:
regex_string = \
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;"
)
event_full_match = re.search(regex_string, full_string)
return event_full_match
def __build_multi_line_event_string(
self, first_line: str, moving_window: List[str]
self, first_line: str, moving_window: List[str]
) -> str:
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
@ -157,18 +181,21 @@ class EventParser(FileParser):
pass
def __handle_line_reading(self, line, file_name):
if not self.last_lines[0] == '\n':
twolines = self.last_lines[0] + ' ' + line.strip()
if not self.last_lines[0] == "\n":
twolines = self.last_lines[0] + " " + line.strip()
else:
twolines = ''
match1 = re.search(r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines)
twolines = ""
match1 = re.search(
r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines
)
if match1:
self.current_id = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.my_id = self.return_number_from_string(self.current_id)
match = re.search(
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", twolines
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)",
twolines,
)
if match:
if match.group(1):
@ -178,13 +205,17 @@ class EventParser(FileParser):
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.my_id * 100) + self.return_number_from_string(match.group(3))
full_id = (self.my_id * 100) + self.return_number_from_string(
match.group(3)
)
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
self.mib_table.update({full_id: (string_to_add, severity, description, file_name)})
self.mib_table.update(
{full_id: (string_to_add, severity, description, file_name)}
)
self.count = self.count + 1
self.last_lines[0] = line
@ -199,18 +230,18 @@ class EventParser(FileParser):
@staticmethod
def return_number_from_string(a_string):
if a_string.startswith('0x'):
if a_string.startswith("0x"):
return int(a_string, 16)
elif a_string.isdigit():
return int(a_string)
else:
print('EventParser: Illegal number representation: ' + a_string)
print("EventParser: Illegal number representation: " + a_string)
return 0
@staticmethod
def convert(name):
single_strings = name.split('_')
new_string = ''
single_strings = name.split("_")
new_string = ""
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
@ -219,10 +250,10 @@ class EventParser(FileParser):
@staticmethod
def clean_up_description(description):
description = description.lstrip('//!<>')
description = description.lstrip("//!<>")
description = description.lstrip()
if description == '':
description = ' '
if description == "":
description = " "
return description
@ -232,28 +263,37 @@ def export_to_file(filename: str, event_list: list, file_separator: str):
event_id = entry[0]
event_value = entry[1]
file.write(
str(event_id) + file_separator + event_value[EVENT_ENTRY_NAME_IDX] + file_separator +
event_value[EVENT_ENTRY_SEVERITY_IDX] + file_separator +
event_value[EVENT_ENTRY_INFO_IDX] + file_separator + event_value[EVENT_SOURCE_FILE_IDX]
+ '\n'
str(event_id)
+ file_separator
+ event_value[EVENT_ENTRY_NAME_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_SEVERITY_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_INFO_IDX]
+ file_separator
+ event_value[EVENT_SOURCE_FILE_IDX]
+ "\n"
)
file.close()
return
def write_translation_source_file(
event_list: list, date_string: str, filename: str = "translateEvents.cpp"
event_list: list, date_string: str, filename: str = "translateEvents.cpp"
):
outputfile = open(filename, "w")
definitions = ""
function = "const char * translateEvents(Event event) {\n switch( (event & 0xffff) ) {\n"
function = (
"const char * translateEvents(Event event) {\n switch( (event & 0xffff) ) {\n"
)
for entry in event_list:
event_id = entry[0]
event_value = entry[1]
definitions += \
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING " \
f"= \"{event_value[EVENT_ENTRY_NAME_IDX]}\";\n"
definitions += (
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING "
f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n'
)
function += f" case({event_id}):\n return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
function += ' default:\n return "UNKNOWN_EVENT";\n'
outputfile.write(
@ -262,7 +302,7 @@ def write_translation_source_file(
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
outputfile.write("#include \"translateEvents.h\"\n\n")
outputfile.write('#include "translateEvents.h"\n\n')
outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n")
outputfile.close()
@ -283,13 +323,20 @@ def handle_csv_export(file_name: str, event_list: list, file_separator: str):
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
export_to_file(filename=file_name, event_list=event_list, file_separator=file_separator)
export_to_file(
filename=file_name, event_list=event_list, file_separator=file_separator
)
def handle_cpp_export(
event_list: list, date_string: str, file_name: str = "translateEvents.cpp",
generate_header: bool = True, header_file_name: str = "translateEvents.h"
event_list: list,
date_string: str,
file_name: str = "translateEvents.cpp",
generate_header: bool = True,
header_file_name: str = "translateEvents.h",
):
write_translation_source_file(event_list=event_list, date_string=date_string, filename=file_name)
write_translation_source_file(
event_list=event_list, date_string=date_string, filename=file_name
)
if generate_header:
write_translation_header_file(filename=header_file_name)

View File

@ -9,20 +9,24 @@ LOGGER = get_console_logger()
class ObjectDefinitionParser(FileParser):
def __init__(self, file_list: list):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r", encoding="utf-8")
for line in file.readlines():
match = re.search(r'([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)', line)
match = re.search(r"([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)", line)
if match:
self.mib_table.update({match.group(2): [match.group(1)]})
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int,
moving_window: list, *args, **kwargs
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
pass
@ -33,27 +37,31 @@ class ObjectDefinitionParser(FileParser):
def export_object_file(filename, object_list, file_separator: str = ","):
file = open(filename, "w")
for entry in object_list:
file.write(str(entry[0]) + file_separator + entry[1][0] + '\n')
file.write(str(entry[0]) + file_separator + entry[1][0] + "\n")
file.close()
def write_translation_file(filename: str, list_of_entries, date_string_full: str):
outputfile = open(filename, "w")
LOGGER.info('ObjectParser: Writing translation file ' + filename)
LOGGER.info("ObjectParser: Writing translation file " + filename)
definitions = ""
function = "const char* translateObject(object_id_t object) " \
"{\n\tswitch( (object & 0xFFFFFFFF) ) {\n"
function = (
"const char* translateObject(object_id_t object) "
"{\n\tswitch( (object & 0xFFFFFFFF) ) {\n"
)
for entry in list_of_entries:
# first part of translate file
definitions += f"const char *{entry[1][0]}_STRING = \"{entry[1][0]}\";\n"
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += f"\tcase {entry[0]}:\n\t\treturn {entry[1][0]}_STRING;\n"
function += '\tdefault:\n\t\treturn "UNKNOWN_OBJECT";\n'
outputfile.write(f"/**\n * @brief\tAuto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n")
outputfile.write("#include \"translateObjects.h\"\n\n")
outputfile.write(
f"/**\n * @brief\tAuto-generated object translation file.\n"
f" * @details\n"
f" * Contains {len(list_of_entries)} translations.\n"
f" * Generated on: {date_string_full}\n */\n"
)
outputfile.write('#include "translateObjects.h"\n\n')
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
outputfile.close()
@ -70,14 +78,16 @@ def write_translation_header_file(filename: str = "translateObjects.h"):
def sql_object_exporter(
object_table: list, db_filename: str, delete_cmd: str, create_cmd: str, insert_cmd: str
object_table: list,
db_filename: str,
delete_cmd: str,
create_cmd: str,
insert_cmd: str,
):
sql_writer = SqlWriter(db_filename=db_filename)
sql_writer.delete(delete_cmd)
sql_writer.open(create_cmd)
for entry in object_table:
sql_writer.write_entries(
insert_cmd, (entry[0], entry[1][0])
)
sql_writer.write_entries(insert_cmd, (entry[0], entry[1][0]))
sql_writer.commit()
sql_writer.close()

View File

@ -16,6 +16,7 @@ class FileListParser:
and parses all included header files recursively.
TODO: Filter functionality for each directory to filter out files or folders
"""
def __init__(self, directory_list_or_name: Union[str, list]):
self.directory_list = []
if isinstance(directory_list_or_name, str):
@ -28,9 +29,12 @@ class FileListParser:
)
self.header_files = []
def parse_header_files(self, search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False):
def parse_header_files(
self,
search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False,
):
"""This function is called to get a list of header files
:param search_recursively:
:param printout_string:
@ -39,15 +43,21 @@ class FileListParser:
"""
print(printout_string, end="")
for directory in self.directory_list:
self.__get_header_file_list(directory, search_recursively, print_current_dir)
self.__get_header_file_list(
directory, search_recursively, print_current_dir
)
print(str(len(self.header_files)) + " header files were found.")
# g.PP.pprint(self.header_files)
return self.header_files
def __get_header_file_list(self, base_directory: str, seach_recursively: bool = False,
print_current_dir: bool = False):
if base_directory[-1] != '/':
base_directory += '/'
def __get_header_file_list(
self,
base_directory: str,
seach_recursively: bool = False,
print_current_dir: bool = False,
):
if base_directory[-1] != "/":
base_directory += "/"
local_header_files = []
if print_current_dir:
print("Parsing header files in: " + base_directory)
@ -58,7 +68,7 @@ class FileListParser:
if header_file_match:
if os.path.isfile(base_directory + entry):
match_string = header_file_match.group(0)
if match_string[0] == '.' or match_string[0] == '_':
if match_string[0] == "." or match_string[0] == "_":
pass
else:
local_header_files.append(base_directory + entry)

View File

@ -18,13 +18,13 @@ from typing import Dict, List
class VerbosityLevels(enum.Enum):
REDUCED = 0,
REGULAR = 1,
REDUCED = (0,)
REGULAR = (1,)
DEBUG = 2
class FileParserModes(enum.Enum):
REGULAR = enum.auto(),
REGULAR = (enum.auto(),)
MOVING_WINDOW = enum.auto()
@ -38,6 +38,7 @@ class FileParser:
3. Call parse_files. Additional arguments and keyword arguments can be supplied as well and
will be passed through to the abstract function implementations.
"""
def __init__(self, file_list):
if len(file_list) == 0:
print("File list is empty !")
@ -123,8 +124,14 @@ class FileParser:
@abstractmethod
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs):
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
"""
This will be called for the MOVING_WINDOW parser mode.
:param file_name: Current file name
@ -152,7 +159,10 @@ class FileParser:
return
moving_window = [""] * moving_window_size
for line_idx, line in enumerate(all_lines):
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
print(f"Moving window pre line anaylsis line {line_idx}")
print(moving_window)
# The moving window will start with only the bottom being in the file
@ -161,15 +171,19 @@ class FileParser:
# More and more of the window is inside the file now
elif line_idx < moving_window_size:
for idx in range(line_idx, 0, -1):
moving_window[moving_window_size - 1 - idx] = \
moving_window[moving_window_size - idx]
moving_window[moving_window_size - 1 - idx] = moving_window[
moving_window_size - idx
]
moving_window[moving_window_size - 1] = line
# The full window is inside the file now.
elif line_idx >= moving_window_size:
for idx in range(moving_window_size - 1):
moving_window[idx] = moving_window[idx + 1]
moving_window[moving_window_size - 1] = line
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
print(f"Moving window post line anaylsis line {line_idx}")
print(moving_window)
self._handle_file_parsing_moving_window(
@ -178,7 +192,10 @@ class FileParser:
# Now the moving window moved past the end of the file. Sections which are outside
# the file are assigned an empty string until the window has moved out of file completely
for remaining_windows_idx in range(moving_window_size):
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
print(f"Moving window pre line analysis post EOF")
print(moving_window)
num_entries_to_clear = remaining_windows_idx + 1
@ -186,7 +203,10 @@ class FileParser:
moving_window[moving_window_size - 1 - idx_to_clear] = ""
for idx_to_reassign in range(moving_window_size - 1 - num_entries_to_clear):
moving_window[idx_to_reassign] = moving_window[idx_to_reassign + 1]
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
if (
self.__debug_moving_window
and self.__debug_moving_window_filename in file_name
):
print(f"Moving window post line anaylsis post EOF")
print(moving_window)
pass
@ -199,16 +219,16 @@ class FileParser:
:return:
"""
try:
file = open(file_name, 'r', encoding='utf-8')
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
print("ReturnValueParser: Decoding error with file " + file_name)
file = open(file_name, 'r', encoding='cp1252')
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
return all_lines
def _build_multi_line_string_generic(
self, first_line: str, moving_window: List[str]
self, first_line: str, moving_window: List[str]
) -> str:
"""This function transforms a multi line match into a one line match by searching for the
semicolon at the string end"""
@ -227,7 +247,7 @@ class FileParser:
return all_lines
def _search_for_descrip_string_generic(
self, moving_window: List[str], break_pattern: str
self, moving_window: List[str], break_pattern: str
) -> str:
current_idx = self._moving_window_center_idx - 1
# Look at the line above first
@ -236,9 +256,7 @@ class FileParser:
)
if not descrip_match:
while True:
if re.search(
break_pattern, moving_window[current_idx]
):
if re.search(break_pattern, moving_window[current_idx]):
break
descrip_match = re.search(
r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx]

View File

@ -20,7 +20,6 @@ INVALID_IF_ID = -1
class InterfaceParser(FileParser):
def __init__(self, file_list: list, print_table: bool = False):
super().__init__(file_list)
self.print_table = print_table
@ -34,18 +33,23 @@ class InterfaceParser(FileParser):
self._debug_mode = enable
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
pass
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
self.file_name_table.append(file_name)
try:
file = open(file_name, 'r', encoding='utf-8')
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, 'r', encoding='cp1252')
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
self.__handle_regular_class_id_parsing(file_name=file_name, all_lines=all_lines)
@ -58,14 +62,14 @@ class InterfaceParser(FileParser):
target_end_name = ""
for line in all_lines:
if not start_matched:
match = re.search(r'[\s]*([\w]*) = [\s]*([\w]*)', line)
match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line)
if match:
# current_file_table.update({count: [match.group(1), match.group(2)]})
start_name = match.group(1)
target_end_name = match.group(2)
start_matched = True
else:
match = re.search(r'[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?', line)
match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line)
if match:
count += 1
if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)):
@ -79,7 +83,9 @@ class InterfaceParser(FileParser):
current_file_table.update({count: [match.group(1), short_name]})
if not start_matched:
print("No start match detected when parsing interface files..")
print(f"Current file: {file_name} | Make sure to include a start definition")
print(
f"Current file: {file_name} | Make sure to include a start definition"
)
sys.exit(1)
if not end_matched:
print(
@ -101,7 +107,9 @@ class InterfaceParser(FileParser):
PrettyPrinter.pprint(self.mib_table)
@staticmethod
def __assign_start_end_indexes(start_name_list_list, end_name_list_list) -> Tuple[List, List]:
def __assign_start_end_indexes(
start_name_list_list, end_name_list_list
) -> Tuple[List, List]:
start_list_list_completed = start_name_list_list
end_list_list_completed = end_name_list_list
all_indexes_filled = False
@ -111,16 +119,19 @@ class InterfaceParser(FileParser):
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[1].isdigit():
start_list_list_completed[idx][2] = int(start_name_list[1])
end_list_list_completed[idx][1] = \
start_list_list_completed[idx][2] + start_list_list_completed[idx][3]
end_list_list_completed[idx][1] = (
start_list_list_completed[idx][2]
+ start_list_list_completed[idx][3]
)
target_end_name = start_name_list[1]
for end_name_list in end_list_list_completed:
end_name = end_name_list[0]
end_value = end_name_list[1]
if end_name == target_end_name and end_value is not None:
start_list_list_completed[idx][2] = end_value
end_list_list_completed[idx][1] = \
end_list_list_completed[idx][1] = (
end_value + start_list_list_completed[idx][3]
)
all_indexes_filled = True
for idx, start_name_list in enumerate(start_list_list_completed):
if start_name_list[2] is None or end_name_list_list[idx][1] is None:
@ -138,8 +149,12 @@ class InterfaceParser(FileParser):
dict_to_build = dict()
for local_count, interface_name_and_shortname in interface_dict.items():
dict_to_build.update(
{interface_name_and_shortname[0]: [local_count + count_start,
interface_name_and_shortname[1]]}
{
interface_name_and_shortname[0]: [
local_count + count_start,
interface_name_and_shortname[1],
]
}
)
self.mib_table.update(dict_to_build)
@ -148,6 +163,7 @@ class ReturnValueParser(FileParser):
"""
Generic return value parser.
"""
def __init__(self, interfaces, file_list, print_tables):
super().__init__(file_list)
self.print_tables = print_tables
@ -156,16 +172,29 @@ class ReturnValueParser(FileParser):
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
self.current_interface_id_entries = {
"Name": "",
"ID": 0,
"FullName": ""
}
self.return_value_dict.update({0: ('OK', 'System-wide code for ok.', 'RETURN_OK',
'HasReturnvaluesIF.h', 'HasReturnvaluesIF')})
self.return_value_dict.update({1: ('Failed', 'Unspecified system-wide code for failed.',
'RETURN_FAILED', 'HasReturnvaluesIF.h',
'HasReturnvaluesIF')})
self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""}
self.return_value_dict.update(
{
0: (
"OK",
"System-wide code for ok.",
"RETURN_OK",
"HasReturnvaluesIF.h",
"HasReturnvaluesIF",
)
}
)
self.return_value_dict.update(
{
1: (
"Failed",
"Unspecified system-wide code for failed.",
"RETURN_FAILED",
"HasReturnvaluesIF.h",
"HasReturnvaluesIF",
)
}
)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
"""Former way to parse returnvalues. Not recommended anymore.
@ -183,21 +212,28 @@ class ReturnValueParser(FileParser):
self.__handle_line_reading(line, file_name, print_truncated_entries)
def _handle_file_parsing_moving_window(
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
*args, **kwargs
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
"""Parse for returnvalues using a moving window"""
interface_id_match = re.search(
rf'{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)', moving_window[self._moving_window_center_idx]
rf"{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)",
moving_window[self._moving_window_center_idx],
)
if interface_id_match:
self.__handle_interfaceid_match(
interface_id_match=interface_id_match, file_name=file_name
)
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)?[\s]*ReturnValue_t[\s]*([\w]*)[\s]*=[\s]*((?!;).*$)",
moving_window[self._moving_window_center_idx], re.DOTALL
moving_window[self._moving_window_center_idx],
re.DOTALL,
)
full_returnvalue_string = ""
if returnvalue_match:
@ -206,7 +242,7 @@ class ReturnValueParser(FileParser):
else:
full_returnvalue_string = self.__build_multi_line_returnvalue_string(
moving_window=moving_window,
first_line=moving_window[self._moving_window_center_idx]
first_line=moving_window[self._moving_window_center_idx],
)
number_match = INVALID_IF_ID
# Try to match for a string using the new API first. Example:
@ -215,14 +251,14 @@ class ReturnValueParser(FileParser):
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*"
r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
full_returnvalue_string
full_returnvalue_string,
)
if not returnvalue_match:
# Try to match for old API using MAE_RETURN_CODE macro
returnvalue_match = re.search(
r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*'
r'MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)',
full_returnvalue_string
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)",
full_returnvalue_string,
)
if returnvalue_match:
number_match = returnvalue_match.group(2)
@ -231,17 +267,19 @@ class ReturnValueParser(FileParser):
if returnvalue_match:
description = self.__search_for_descrip_string(moving_window=moving_window)
if number_match == INVALID_IF_ID:
LOGGER.warning(f'Invalid number match detected for file {file_name}')
LOGGER.warning(f'Match groups:')
LOGGER.warning(f"Invalid number match detected for file {file_name}")
LOGGER.warning(f"Match groups:")
for group in returnvalue_match.groups():
LOGGER.info(group)
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1), file_name=file_name,
number_match=number_match, description=description
name_match=returnvalue_match.group(1),
file_name=file_name,
number_match=number_match,
description=description,
)
def __build_multi_line_returnvalue_string(
self, first_line: str, moving_window: List[str]
self, first_line: str, moving_window: List[str]
) -> str:
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
@ -249,29 +287,33 @@ class ReturnValueParser(FileParser):
def __search_for_descrip_string(self, moving_window: List[str]) -> str:
return self._search_for_descrip_string_generic(
moving_window=moving_window, break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t"
moving_window=moving_window,
break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t",
)
def __handle_line_reading(self, line, file_name, print_truncated_entries: bool):
newline = line
if self.last_lines[0] != '\n':
two_lines = self.last_lines[0] + ' ' + newline.strip()
if self.last_lines[0] != "\n":
two_lines = self.last_lines[0] + " " + newline.strip()
else:
two_lines = ''
interface_id_match = re.search(r'INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)',
two_lines)
two_lines = ""
interface_id_match = re.search(
r"INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)", two_lines
)
if interface_id_match:
self.__handle_interfaceid_match(interface_id_match, file_name=file_name)
returnvalue_match = re.search(
r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*'
r'MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)',
two_lines
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)",
two_lines,
)
if returnvalue_match:
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1), file_name=file_name, description="",
number_match=returnvalue_match.group(2)
name_match=returnvalue_match.group(1),
file_name=file_name,
description="",
number_match=returnvalue_match.group(2),
)
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
@ -281,43 +323,50 @@ class ReturnValueParser(FileParser):
Returns whether the interface ID was found successfully in the IF ID header files
"""
if self.get_verbosity() == VerbosityLevels.DEBUG:
LOGGER.info(f'Interface ID {interface_id_match.group(1)} found in {file_name}')
LOGGER.info(
f"Interface ID {interface_id_match.group(1)} found in {file_name}"
)
if_id_entry = self.interfaces.get(interface_id_match.group(1))
if if_id_entry is not None:
self.current_interface_id_entries["ID"] = if_id_entry[0]
else:
LOGGER.warning(
f'Interface ID {interface_id_match.group(1)} not found in IF ID dictionary'
f"Interface ID {interface_id_match.group(1)} not found in IF ID dictionary"
)
return False
self.current_interface_id_entries["Name"] = \
self.interfaces[interface_id_match.group(1)][1]
self.current_interface_id_entries["Name"] = self.interfaces[
interface_id_match.group(1)
][1]
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
if self.get_verbosity() == VerbosityLevels.DEBUG:
current_id = self.current_interface_id_entries["ID"]
LOGGER.info(f'Current ID: {current_id}')
LOGGER.info(f"Current ID: {current_id}")
return True
def __handle_returnvalue_match(
self, name_match: str, number_match: str, file_name: str, description: str
self, name_match: str, number_match: str, file_name: str, description: str
):
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"], name_match, MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES
self.current_interface_id_entries["Name"],
name_match,
MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES,
)
full_id = (self.current_interface_id_entries["ID"] << 8) + \
return_number_from_string(number_match)
full_id = (
self.current_interface_id_entries["ID"] << 8
) + return_number_from_string(number_match)
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
dict_tuple = (
string_to_add, description, number_match, file_name,
self.current_interface_id_entries["FullName"]
string_to_add,
description,
number_match,
file_name,
self.current_interface_id_entries["FullName"],
)
self.return_value_dict.update({
full_id: dict_tuple
})
self.return_value_dict.update({full_id: dict_tuple})
self.count = self.count + 1
def _post_parsing_operation(self):
@ -330,19 +379,33 @@ class ReturnValueParser(FileParser):
file = open(filename, "w")
for entry in list_of_entries.items():
file.write(
hex(entry[0]) + file_separator + entry[1][0] + file_separator + entry[1][1] +
file_separator + entry[1][2] + file_separator + entry[1][3] + file_separator +
entry[1][4] + '\n'
hex(entry[0])
+ file_separator
+ entry[1][0]
+ file_separator
+ entry[1][1]
+ file_separator
+ entry[1][2]
+ file_separator
+ entry[1][3]
+ file_separator
+ entry[1][4]
+ "\n"
)
file.close()
def build_checked_string(self, first_part, second_part, max_string_len: int,
print_truncated_entries: bool):
""" Build a checked string """
my_str = first_part + '_' + self.convert(second_part)
def build_checked_string(
self,
first_part,
second_part,
max_string_len: int,
print_truncated_entries: bool,
):
"""Build a checked string"""
my_str = first_part + "_" + self.convert(second_part)
if len(my_str) > max_string_len:
if print_truncated_entries:
LOGGER.warning(f'Entry {my_str} too long. Will truncate.')
LOGGER.warning(f"Entry {my_str} too long. Will truncate.")
my_str = my_str[0:max_string_len]
else:
# print("Entry: " + myStr + " is all right.")
@ -351,8 +414,8 @@ class ReturnValueParser(FileParser):
@staticmethod
def convert(name):
single_strings = name.split('_')
new_string = ''
single_strings = name.split("_")
new_string = ""
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
@ -361,17 +424,16 @@ class ReturnValueParser(FileParser):
@staticmethod
def clean_up_description(descr_string):
description = descr_string.lstrip('!<- ')
if description == '':
description = ' '
description = descr_string.lstrip("!<- ")
if description == "":
description = " "
return description
def return_number_from_string(a_string):
if a_string.startswith('0x'):
if a_string.startswith("0x"):
return int(a_string, 16)
if a_string.isdigit():
return int(a_string)
LOGGER.warning(f'Illegal number representation: {a_string}')
LOGGER.warning(f"Illegal number representation: {a_string}")
return 0

View File

@ -4,7 +4,11 @@ from fsfwgen.utility.file_management import copy_file, move_file
# TODO: Export to SQL
class CsvWriter:
def __init__(
self, filename: str, table_to_print=None, header_array=None, file_separator: str = ","
self,
filename: str,
table_to_print=None,
header_array=None,
file_separator: str = ",",
):
if header_array is None:
header_array = []
@ -22,7 +26,7 @@ class CsvWriter:
file.write("Index" + self.file_separator)
for index in range(self.column_numbers):
# noinspection PyTypeChecker
if index < len(self.header_array)-1:
if index < len(self.header_array) - 1:
file.write(self.header_array[index] + self.file_separator)
else:
file.write(self.header_array[index] + "\n")

View File

@ -2,36 +2,37 @@
import shutil
import os
from fsfwgen.core import get_console_logger
LOGGER = get_console_logger()
def copy_file(filename: str, destination: str = "", delete_existing_file: bool = False):
if not os.path.exists(filename):
LOGGER.warning(f'File {filename} does not exist')
LOGGER.warning(f"File {filename} does not exist")
return
if not os.path.exists(destination):
LOGGER.warning(f'Destination directory {destination} does not exist')
LOGGER.warning(f"Destination directory {destination} does not exist")
return
try:
shutil.copy2(filename, destination)
except FileNotFoundError:
LOGGER.exception('File not found!')
LOGGER.exception("File not found!")
except shutil.SameFileError:
LOGGER.exception('Source and destination are the same!')
LOGGER.exception("Source and destination are the same!")
def move_file(file_name: str, destination: str = ""):
if not os.path.exists(file_name):
print(f'move_file: File {file_name} does not exist')
print(f"move_file: File {file_name} does not exist")
return
if not os.path.exists(destination):
print(f'move_file: Destination directory {destination} does not exist')
print(f"move_file: Destination directory {destination} does not exist")
return
try:
shutil.copy2(file_name, destination)
os.remove(file_name)
return
except FileNotFoundError:
LOGGER.exception('File not found!')
LOGGER.exception("File not found!")
except shutil.SameFileError:
LOGGER.exception('Source and destination are the same!')
LOGGER.exception("Source and destination are the same!")

View File

@ -13,4 +13,3 @@ class Printer:
print(leading_string)
PrettyPrinter.pprint(dictionary)
print("\r\n", end="")

View File

@ -11,7 +11,7 @@ class SqlWriter:
self.conn = sqlite3.connect(self.filename)
def open(self, sql_creation_command: str):
LOGGER.info(f'SQL Writer: Opening {self.filename}')
LOGGER.info(f"SQL Writer: Opening {self.filename}")
self.conn.execute(sql_creation_command)
def delete(self, sql_deletion_command):
@ -31,7 +31,7 @@ class SqlWriter:
self.conn.close()
def sql_writing_helper(
self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str = ""
self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str = ""
):
if deletion_cmd != "":
self.delete(deletion_cmd)