diff --git a/core.py b/core.py index e2dd7f8..603af39 100644 --- a/core.py +++ b/core.py @@ -1,14 +1,33 @@ import enum +import sys import colorlog import logging import argparse -CONSOLE_LOGGER_NAME = 'FSFW Generator Logger' +CONSOLE_LOGGER_NAME = "FSFW Generator Logger" LOGGER_INSTANCE = None +class InfoFilter(logging.Filter): + """Filter object, which is used so that only INFO and DEBUG messages are printed to stdout.""" + + def filter(self, rec): + if rec.levelno == logging.INFO: + return rec.levelno + return None + + +class DebugFilter(logging.Filter): + """Filter object, which is used so that only DEBUG messages are printed to stdout.""" + + def filter(self, rec): + if rec.levelno == logging.DEBUG: + return rec.levelno + return None + + def get_console_logger(): global LOGGER_INSTANCE if LOGGER_INSTANCE is None: @@ -17,35 +36,48 @@ def get_console_logger(): def init_console_logger(): - handler = colorlog.StreamHandler() - handler.setFormatter(colorlog.ColoredFormatter( - '%(log_color)s%(levelname)-8s | %(reset)s%(message)s' - )) - logger = colorlog.getLogger(CONSOLE_LOGGER_NAME) - logger.addHandler(handler) + generic_format = colorlog.ColoredFormatter( + "%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s" + ) + + console_info_handler = colorlog.StreamHandler(stream=sys.stdout) + console_info_handler.setLevel(logging.INFO) + console_info_handler.addFilter(InfoFilter()) + + console_debug_handler = logging.StreamHandler(stream=sys.stdout) + console_debug_handler.setLevel(logging.DEBUG) + console_debug_handler.addFilter(DebugFilter()) + + console_info_handler.setFormatter(generic_format) + console_info_handler.addFilter(InfoFilter()) + console_debug_handler.addFilter(DebugFilter()) + logger.addHandler(console_info_handler) + logger.addHandler(console_debug_handler) logger.setLevel(logging.DEBUG) return logger class ParserTypes(enum.Enum): - EVENTS = 'events', - OBJECTS = 'objects', - RETVALS = 'returnvalues', - SUBSERVICES = 'subservices' + EVENTS = ("events",) + OBJECTS = ("objects",) + RETVALS = ("returnvalues",) + SUBSERVICES = "subservices" def init_printout(project_string: str): global LOGGER_INSTANCE LOGGER_INSTANCE = get_console_logger() - print(f'-- {project_string} MOD Generator --') + print(f"-- {project_string} MOD Generator --") def return_generic_args_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser('Arguments for FSFW MOD generation') + parser = argparse.ArgumentParser("Arguments for FSFW MOD generation") choices = ("events", "objects", "returnvalues", "subservices") parser.add_argument( - 'type', metavar='type', choices=choices, - help=f'Type of MOD data to generate. Choices: {choices}' + "type", + metavar="type", + choices=choices, + help=f"Type of MOD data to generate. Choices: {choices}", ) return parser diff --git a/events/event_parser.py b/events/event_parser.py index fa6e21c..f209752 100644 --- a/events/event_parser.py +++ b/events/event_parser.py @@ -25,14 +25,22 @@ class SubsystemDefinitionParser(FileParser): def _handle_file_parsing(self, file_name: str, *args, **kwargs): file = open(file_name, "r") for line in file.readlines(): - match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', line) + match = re.search(r"([A-Z0-9_]*) = ([0-9]{1,3})", line) if match: self.mib_table.update({match.group(1): [match.group(2)]}) def _handle_file_parsing_moving_window( - self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, - *args, **kwargs): - match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', moving_window[self.moving_window_center_idx]) + self, + file_name: str, + current_line: int, + moving_window_size: int, + moving_window: list, + *args, + **kwargs, + ): + match = re.search( + r"([A-Z0-9_]*) = ([0-9]{1,3})", moving_window[self.moving_window_center_idx] + ) if match: self.mib_table.update({match.group(1): [match.group(2)]}) @@ -52,10 +60,10 @@ class EventParser(FileParser): def _handle_file_parsing(self, file_name: str, *args: any, **kwargs): try: - file = open(file_name, 'r', encoding='utf-8') + file = open(file_name, "r", encoding="utf-8") all_lines = file.readlines() except UnicodeDecodeError: - file = open(file_name, 'r', encoding='cp1252') + file = open(file_name, "r", encoding="cp1252") all_lines = file.readlines() total_count = 0 for line in all_lines: @@ -66,17 +74,25 @@ class EventParser(FileParser): self.count = 0 def _handle_file_parsing_moving_window( - self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, - *args, **kwargs): + self, + file_name: str, + current_line: int, + moving_window_size: int, + moving_window: list, + *args, + **kwargs, + ): subsystem_id_assignment_match = re.search( rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);", - moving_window[self.moving_window_center_idx] + moving_window[self.moving_window_center_idx], ) if subsystem_id_assignment_match: # For now, it is assumed that there is only going to be one subsystem ID per # class / source file try: - self.current_id = self.interfaces[subsystem_id_assignment_match.group(2)][0] + self.current_id = self.interfaces[ + subsystem_id_assignment_match.group(2) + ][0] self.my_id = self.return_number_from_string(self.current_id) except KeyError as e: print(f"Key not found: {e}") @@ -84,30 +100,33 @@ class EventParser(FileParser): # Now try to look for event definitions. Moving windows allows multi line event definitions # These two variants need to be checked event_match = re.match( - r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*event::makeEvent[^\n]*", - moving_window[self.moving_window_center_idx] + r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=", + moving_window[self.moving_window_center_idx], ) macro_api_match = False - if not event_match: - event_match = re.match( - r"[\s]*static[\s]*const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*MAKE_EVENT[^\n]*", - moving_window[self.moving_window_center_idx] - ) - if event_match: + for idx in range(0, 3): + if "MAKE_EVENT" in moving_window[self.moving_window_center_idx + idx]: macro_api_match = True + break + elif "makeEvent" in moving_window[self.moving_window_center_idx + idx]: + break + else: + event_match = False if event_match: self.__handle_event_match( - event_match=event_match, macro_api_match=macro_api_match, - moving_window=moving_window, file_name=file_name + event_match=event_match, + macro_api_match=macro_api_match, + moving_window=moving_window, + file_name=file_name, ) def __handle_event_match( - self, event_match, macro_api_match: bool, moving_window: list, file_name: str + self, event_match, macro_api_match: bool, moving_window: list, file_name: str ): if ";" in event_match.group(0): event_full_match = self.__generate_regex_event_match( macro_api_match=macro_api_match, - full_string=moving_window[self.moving_window_center_idx] + full_string=moving_window[self.moving_window_center_idx], ) else: multi_line_string = self.__build_multi_line_event_string( @@ -117,17 +136,20 @@ class EventParser(FileParser): macro_api_match=macro_api_match, full_string=multi_line_string ) description = self._search_for_descrip_string_generic( - moving_window=moving_window, break_pattern=r'[\s]*static const(?:expr)?[\s]*Event[\s]*' + moving_window=moving_window, + break_pattern=r"[\s]*static const(?:expr)?[\s]*Event[\s]*", ) if event_full_match: name = event_match.group(EVENT_NAME_IDX) if macro_api_match: full_id = (self.my_id * 100) + self.return_number_from_string( - event_full_match.group(2)) + event_full_match.group(2) + ) severity = event_full_match.group(3) else: full_id = (self.my_id * 100) + self.return_number_from_string( - event_full_match.group(3)) + event_full_match.group(3) + ) severity = event_full_match.group(4) self.mib_table.update({full_id: (name, severity, description, file_name)}) self.count = self.count + 1 @@ -136,18 +158,20 @@ class EventParser(FileParser): def __generate_regex_event_match(macro_api_match: bool, full_string: str): if macro_api_match: # One line event definition. - regex_string = \ - r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \ + regex_string = ( + r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;" + ) else: - regex_string = \ - r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \ + regex_string = ( + r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;" + ) event_full_match = re.search(regex_string, full_string) return event_full_match def __build_multi_line_event_string( - self, first_line: str, moving_window: List[str] + self, first_line: str, moving_window: List[str] ) -> str: return self._build_multi_line_string_generic( first_line=first_line, moving_window=moving_window @@ -157,18 +181,21 @@ class EventParser(FileParser): pass def __handle_line_reading(self, line, file_name): - if not self.last_lines[0] == '\n': - twolines = self.last_lines[0] + ' ' + line.strip() + if not self.last_lines[0] == "\n": + twolines = self.last_lines[0] + " " + line.strip() else: - twolines = '' - match1 = re.search(r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines) + twolines = "" + match1 = re.search( + r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines + ) if match1: self.current_id = self.interfaces[match1.group(1)][0] # print( "Current ID: " + str(currentId) ) self.my_id = self.return_number_from_string(self.current_id) match = re.search( r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*" - r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", twolines + r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", + twolines, ) if match: if match.group(1): @@ -178,13 +205,17 @@ class EventParser(FileParser): if match.group(6): description = self.clean_up_description(match.group(6)) string_to_add = match.group(2) - full_id = (self.my_id * 100) + self.return_number_from_string(match.group(3)) + full_id = (self.my_id * 100) + self.return_number_from_string( + match.group(3) + ) severity = match.group(4) if full_id in self.mib_table: # print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name + # " was already in " + self.mib_table[full_id][3]) pass - self.mib_table.update({full_id: (string_to_add, severity, description, file_name)}) + self.mib_table.update( + {full_id: (string_to_add, severity, description, file_name)} + ) self.count = self.count + 1 self.last_lines[0] = line @@ -199,18 +230,18 @@ class EventParser(FileParser): @staticmethod def return_number_from_string(a_string): - if a_string.startswith('0x'): + if a_string.startswith("0x"): return int(a_string, 16) elif a_string.isdigit(): return int(a_string) else: - print('EventParser: Illegal number representation: ' + a_string) + print("EventParser: Illegal number representation: " + a_string) return 0 @staticmethod def convert(name): - single_strings = name.split('_') - new_string = '' + single_strings = name.split("_") + new_string = "" for one_string in single_strings: one_string = one_string.lower() one_string = one_string.capitalize() @@ -219,10 +250,10 @@ class EventParser(FileParser): @staticmethod def clean_up_description(description): - description = description.lstrip('//!<>') + description = description.lstrip("//!<>") description = description.lstrip() - if description == '': - description = ' ' + if description == "": + description = " " return description @@ -232,28 +263,37 @@ def export_to_file(filename: str, event_list: list, file_separator: str): event_id = entry[0] event_value = entry[1] file.write( - str(event_id) + file_separator + event_value[EVENT_ENTRY_NAME_IDX] + file_separator + - event_value[EVENT_ENTRY_SEVERITY_IDX] + file_separator + - event_value[EVENT_ENTRY_INFO_IDX] + file_separator + event_value[EVENT_SOURCE_FILE_IDX] - + '\n' + str(event_id) + + file_separator + + event_value[EVENT_ENTRY_NAME_IDX] + + file_separator + + event_value[EVENT_ENTRY_SEVERITY_IDX] + + file_separator + + event_value[EVENT_ENTRY_INFO_IDX] + + file_separator + + event_value[EVENT_SOURCE_FILE_IDX] + + "\n" ) file.close() return def write_translation_source_file( - event_list: list, date_string: str, filename: str = "translateEvents.cpp" + event_list: list, date_string: str, filename: str = "translateEvents.cpp" ): outputfile = open(filename, "w") definitions = "" - function = "const char * translateEvents(Event event) {\n switch( (event & 0xffff) ) {\n" + function = ( + "const char * translateEvents(Event event) {\n switch( (event & 0xffff) ) {\n" + ) for entry in event_list: event_id = entry[0] event_value = entry[1] - definitions += \ - f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING " \ - f"= \"{event_value[EVENT_ENTRY_NAME_IDX]}\";\n" + definitions += ( + f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING " + f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n' + ) function += f" case({event_id}):\n return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n" function += ' default:\n return "UNKNOWN_EVENT";\n' outputfile.write( @@ -262,7 +302,7 @@ def write_translation_source_file( f" * @details\n" f" * Generated on: {date_string}\n */\n" ) - outputfile.write("#include \"translateEvents.h\"\n\n") + outputfile.write('#include "translateEvents.h"\n\n') outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n") outputfile.close() @@ -283,13 +323,20 @@ def handle_csv_export(file_name: str, event_list: list, file_separator: str): Generates the CSV in the same directory as the .py file and copes the CSV to another directory if specified. """ - export_to_file(filename=file_name, event_list=event_list, file_separator=file_separator) + export_to_file( + filename=file_name, event_list=event_list, file_separator=file_separator + ) def handle_cpp_export( - event_list: list, date_string: str, file_name: str = "translateEvents.cpp", - generate_header: bool = True, header_file_name: str = "translateEvents.h" + event_list: list, + date_string: str, + file_name: str = "translateEvents.cpp", + generate_header: bool = True, + header_file_name: str = "translateEvents.h", ): - write_translation_source_file(event_list=event_list, date_string=date_string, filename=file_name) + write_translation_source_file( + event_list=event_list, date_string=date_string, filename=file_name + ) if generate_header: write_translation_header_file(filename=header_file_name) diff --git a/objects/objects.py b/objects/objects.py index 0b8f376..de0a33d 100644 --- a/objects/objects.py +++ b/objects/objects.py @@ -9,20 +9,24 @@ LOGGER = get_console_logger() class ObjectDefinitionParser(FileParser): - def __init__(self, file_list: list): super().__init__(file_list) def _handle_file_parsing(self, file_name: str, *args, **kwargs): file = open(file_name, "r", encoding="utf-8") for line in file.readlines(): - match = re.search(r'([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)', line) + match = re.search(r"([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)", line) if match: self.mib_table.update({match.group(2): [match.group(1)]}) def _handle_file_parsing_moving_window( - self, file_name: str, current_line: int, moving_window_size: int, - moving_window: list, *args, **kwargs + self, + file_name: str, + current_line: int, + moving_window_size: int, + moving_window: list, + *args, + **kwargs, ): pass @@ -33,27 +37,31 @@ class ObjectDefinitionParser(FileParser): def export_object_file(filename, object_list, file_separator: str = ","): file = open(filename, "w") for entry in object_list: - file.write(str(entry[0]) + file_separator + entry[1][0] + '\n') + file.write(str(entry[0]) + file_separator + entry[1][0] + "\n") file.close() def write_translation_file(filename: str, list_of_entries, date_string_full: str): outputfile = open(filename, "w") - LOGGER.info('ObjectParser: Writing translation file ' + filename) + LOGGER.info("ObjectParser: Writing translation file " + filename) definitions = "" - function = "const char* translateObject(object_id_t object) " \ - "{\n\tswitch( (object & 0xFFFFFFFF) ) {\n" + function = ( + "const char* translateObject(object_id_t object) " + "{\n\tswitch( (object & 0xFFFFFFFF) ) {\n" + ) for entry in list_of_entries: # first part of translate file - definitions += f"const char *{entry[1][0]}_STRING = \"{entry[1][0]}\";\n" + definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n' # second part of translate file. entry[i] contains 32 bit hexadecimal numbers function += f"\tcase {entry[0]}:\n\t\treturn {entry[1][0]}_STRING;\n" function += '\tdefault:\n\t\treturn "UNKNOWN_OBJECT";\n' - outputfile.write(f"/**\n * @brief\tAuto-generated object translation file.\n" - f" * @details\n" - f" * Contains {len(list_of_entries)} translations.\n" - f" * Generated on: {date_string_full}\n */\n") - outputfile.write("#include \"translateObjects.h\"\n\n") + outputfile.write( + f"/**\n * @brief\tAuto-generated object translation file.\n" + f" * @details\n" + f" * Contains {len(list_of_entries)} translations.\n" + f" * Generated on: {date_string_full}\n */\n" + ) + outputfile.write('#include "translateObjects.h"\n\n') outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n") outputfile.close() @@ -70,14 +78,16 @@ def write_translation_header_file(filename: str = "translateObjects.h"): def sql_object_exporter( - object_table: list, db_filename: str, delete_cmd: str, create_cmd: str, insert_cmd: str + object_table: list, + db_filename: str, + delete_cmd: str, + create_cmd: str, + insert_cmd: str, ): sql_writer = SqlWriter(db_filename=db_filename) sql_writer.delete(delete_cmd) sql_writer.open(create_cmd) for entry in object_table: - sql_writer.write_entries( - insert_cmd, (entry[0], entry[1][0]) - ) + sql_writer.write_entries(insert_cmd, (entry[0], entry[1][0])) sql_writer.commit() sql_writer.close() diff --git a/parserbase/file_list_parser.py b/parserbase/file_list_parser.py index f6101d9..ab02e54 100644 --- a/parserbase/file_list_parser.py +++ b/parserbase/file_list_parser.py @@ -16,6 +16,7 @@ class FileListParser: and parses all included header files recursively. TODO: Filter functionality for each directory to filter out files or folders """ + def __init__(self, directory_list_or_name: Union[str, list]): self.directory_list = [] if isinstance(directory_list_or_name, str): @@ -28,9 +29,12 @@ class FileListParser: ) self.header_files = [] - def parse_header_files(self, search_recursively: bool = False, - printout_string: str = "Parsing header files: ", - print_current_dir: bool = False): + def parse_header_files( + self, + search_recursively: bool = False, + printout_string: str = "Parsing header files: ", + print_current_dir: bool = False, + ): """This function is called to get a list of header files :param search_recursively: :param printout_string: @@ -39,15 +43,21 @@ class FileListParser: """ print(printout_string, end="") for directory in self.directory_list: - self.__get_header_file_list(directory, search_recursively, print_current_dir) + self.__get_header_file_list( + directory, search_recursively, print_current_dir + ) print(str(len(self.header_files)) + " header files were found.") # g.PP.pprint(self.header_files) return self.header_files - def __get_header_file_list(self, base_directory: str, seach_recursively: bool = False, - print_current_dir: bool = False): - if base_directory[-1] != '/': - base_directory += '/' + def __get_header_file_list( + self, + base_directory: str, + seach_recursively: bool = False, + print_current_dir: bool = False, + ): + if base_directory[-1] != "/": + base_directory += "/" local_header_files = [] if print_current_dir: print("Parsing header files in: " + base_directory) @@ -58,7 +68,7 @@ class FileListParser: if header_file_match: if os.path.isfile(base_directory + entry): match_string = header_file_match.group(0) - if match_string[0] == '.' or match_string[0] == '_': + if match_string[0] == "." or match_string[0] == "_": pass else: local_header_files.append(base_directory + entry) diff --git a/parserbase/parser.py b/parserbase/parser.py index 8e27619..7c4f49a 100644 --- a/parserbase/parser.py +++ b/parserbase/parser.py @@ -18,13 +18,13 @@ from typing import Dict, List class VerbosityLevels(enum.Enum): - REDUCED = 0, - REGULAR = 1, + REDUCED = (0,) + REGULAR = (1,) DEBUG = 2 class FileParserModes(enum.Enum): - REGULAR = enum.auto(), + REGULAR = (enum.auto(),) MOVING_WINDOW = enum.auto() @@ -38,6 +38,7 @@ class FileParser: 3. Call parse_files. Additional arguments and keyword arguments can be supplied as well and will be passed through to the abstract function implementations. """ + def __init__(self, file_list): if len(file_list) == 0: print("File list is empty !") @@ -123,8 +124,14 @@ class FileParser: @abstractmethod def _handle_file_parsing_moving_window( - self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, - *args, **kwargs): + self, + file_name: str, + current_line: int, + moving_window_size: int, + moving_window: list, + *args, + **kwargs, + ): """ This will be called for the MOVING_WINDOW parser mode. :param file_name: Current file name @@ -152,7 +159,10 @@ class FileParser: return moving_window = [""] * moving_window_size for line_idx, line in enumerate(all_lines): - if self.__debug_moving_window and self.__debug_moving_window_filename in file_name: + if ( + self.__debug_moving_window + and self.__debug_moving_window_filename in file_name + ): print(f"Moving window pre line anaylsis line {line_idx}") print(moving_window) # The moving window will start with only the bottom being in the file @@ -161,15 +171,19 @@ class FileParser: # More and more of the window is inside the file now elif line_idx < moving_window_size: for idx in range(line_idx, 0, -1): - moving_window[moving_window_size - 1 - idx] = \ - moving_window[moving_window_size - idx] + moving_window[moving_window_size - 1 - idx] = moving_window[ + moving_window_size - idx + ] moving_window[moving_window_size - 1] = line # The full window is inside the file now. elif line_idx >= moving_window_size: for idx in range(moving_window_size - 1): moving_window[idx] = moving_window[idx + 1] moving_window[moving_window_size - 1] = line - if self.__debug_moving_window and self.__debug_moving_window_filename in file_name: + if ( + self.__debug_moving_window + and self.__debug_moving_window_filename in file_name + ): print(f"Moving window post line anaylsis line {line_idx}") print(moving_window) self._handle_file_parsing_moving_window( @@ -178,7 +192,10 @@ class FileParser: # Now the moving window moved past the end of the file. Sections which are outside # the file are assigned an empty string until the window has moved out of file completely for remaining_windows_idx in range(moving_window_size): - if self.__debug_moving_window and self.__debug_moving_window_filename in file_name: + if ( + self.__debug_moving_window + and self.__debug_moving_window_filename in file_name + ): print(f"Moving window pre line analysis post EOF") print(moving_window) num_entries_to_clear = remaining_windows_idx + 1 @@ -186,7 +203,10 @@ class FileParser: moving_window[moving_window_size - 1 - idx_to_clear] = "" for idx_to_reassign in range(moving_window_size - 1 - num_entries_to_clear): moving_window[idx_to_reassign] = moving_window[idx_to_reassign + 1] - if self.__debug_moving_window and self.__debug_moving_window_filename in file_name: + if ( + self.__debug_moving_window + and self.__debug_moving_window_filename in file_name + ): print(f"Moving window post line anaylsis post EOF") print(moving_window) pass @@ -199,16 +219,16 @@ class FileParser: :return: """ try: - file = open(file_name, 'r', encoding='utf-8') + file = open(file_name, "r", encoding="utf-8") all_lines = file.readlines() except UnicodeDecodeError: print("ReturnValueParser: Decoding error with file " + file_name) - file = open(file_name, 'r', encoding='cp1252') + file = open(file_name, "r", encoding="cp1252") all_lines = file.readlines() return all_lines def _build_multi_line_string_generic( - self, first_line: str, moving_window: List[str] + self, first_line: str, moving_window: List[str] ) -> str: """This function transforms a multi line match into a one line match by searching for the semicolon at the string end""" @@ -227,7 +247,7 @@ class FileParser: return all_lines def _search_for_descrip_string_generic( - self, moving_window: List[str], break_pattern: str + self, moving_window: List[str], break_pattern: str ) -> str: current_idx = self._moving_window_center_idx - 1 # Look at the line above first @@ -236,9 +256,7 @@ class FileParser: ) if not descrip_match: while True: - if re.search( - break_pattern, moving_window[current_idx] - ): + if re.search(break_pattern, moving_window[current_idx]): break descrip_match = re.search( r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx] diff --git a/returnvalues/returnvalues_parser.py b/returnvalues/returnvalues_parser.py index 29e4cc5..ee341df 100644 --- a/returnvalues/returnvalues_parser.py +++ b/returnvalues/returnvalues_parser.py @@ -20,7 +20,6 @@ INVALID_IF_ID = -1 class InterfaceParser(FileParser): - def __init__(self, file_list: list, print_table: bool = False): super().__init__(file_list) self.print_table = print_table @@ -34,18 +33,23 @@ class InterfaceParser(FileParser): self._debug_mode = enable def _handle_file_parsing_moving_window( - self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, - *args, **kwargs + self, + file_name: str, + current_line: int, + moving_window_size: int, + moving_window: list, + *args, + **kwargs, ): pass def _handle_file_parsing(self, file_name: str, *args, **kwargs): self.file_name_table.append(file_name) try: - file = open(file_name, 'r', encoding='utf-8') + file = open(file_name, "r", encoding="utf-8") all_lines = file.readlines() except UnicodeDecodeError: - file = open(file_name, 'r', encoding='cp1252') + file = open(file_name, "r", encoding="cp1252") all_lines = file.readlines() self.__handle_regular_class_id_parsing(file_name=file_name, all_lines=all_lines) @@ -58,14 +62,14 @@ class InterfaceParser(FileParser): target_end_name = "" for line in all_lines: if not start_matched: - match = re.search(r'[\s]*([\w]*) = [\s]*([\w]*)', line) + match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line) if match: # current_file_table.update({count: [match.group(1), match.group(2)]}) start_name = match.group(1) target_end_name = match.group(2) start_matched = True else: - match = re.search(r'[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?', line) + match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line) if match: count += 1 if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)): @@ -79,7 +83,9 @@ class InterfaceParser(FileParser): current_file_table.update({count: [match.group(1), short_name]}) if not start_matched: print("No start match detected when parsing interface files..") - print(f"Current file: {file_name} | Make sure to include a start definition") + print( + f"Current file: {file_name} | Make sure to include a start definition" + ) sys.exit(1) if not end_matched: print( @@ -101,7 +107,9 @@ class InterfaceParser(FileParser): PrettyPrinter.pprint(self.mib_table) @staticmethod - def __assign_start_end_indexes(start_name_list_list, end_name_list_list) -> Tuple[List, List]: + def __assign_start_end_indexes( + start_name_list_list, end_name_list_list + ) -> Tuple[List, List]: start_list_list_completed = start_name_list_list end_list_list_completed = end_name_list_list all_indexes_filled = False @@ -111,16 +119,19 @@ class InterfaceParser(FileParser): for idx, start_name_list in enumerate(start_list_list_completed): if start_name_list[1].isdigit(): start_list_list_completed[idx][2] = int(start_name_list[1]) - end_list_list_completed[idx][1] = \ - start_list_list_completed[idx][2] + start_list_list_completed[idx][3] + end_list_list_completed[idx][1] = ( + start_list_list_completed[idx][2] + + start_list_list_completed[idx][3] + ) target_end_name = start_name_list[1] for end_name_list in end_list_list_completed: end_name = end_name_list[0] end_value = end_name_list[1] if end_name == target_end_name and end_value is not None: start_list_list_completed[idx][2] = end_value - end_list_list_completed[idx][1] = \ + end_list_list_completed[idx][1] = ( end_value + start_list_list_completed[idx][3] + ) all_indexes_filled = True for idx, start_name_list in enumerate(start_list_list_completed): if start_name_list[2] is None or end_name_list_list[idx][1] is None: @@ -138,8 +149,12 @@ class InterfaceParser(FileParser): dict_to_build = dict() for local_count, interface_name_and_shortname in interface_dict.items(): dict_to_build.update( - {interface_name_and_shortname[0]: [local_count + count_start, - interface_name_and_shortname[1]]} + { + interface_name_and_shortname[0]: [ + local_count + count_start, + interface_name_and_shortname[1], + ] + } ) self.mib_table.update(dict_to_build) @@ -148,6 +163,7 @@ class ReturnValueParser(FileParser): """ Generic return value parser. """ + def __init__(self, interfaces, file_list, print_tables): super().__init__(file_list) self.print_tables = print_tables @@ -156,16 +172,29 @@ class ReturnValueParser(FileParser): self.count = 0 # Stores last three lines self.last_lines = ["", "", ""] - self.current_interface_id_entries = { - "Name": "", - "ID": 0, - "FullName": "" - } - self.return_value_dict.update({0: ('OK', 'System-wide code for ok.', 'RETURN_OK', - 'HasReturnvaluesIF.h', 'HasReturnvaluesIF')}) - self.return_value_dict.update({1: ('Failed', 'Unspecified system-wide code for failed.', - 'RETURN_FAILED', 'HasReturnvaluesIF.h', - 'HasReturnvaluesIF')}) + self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""} + self.return_value_dict.update( + { + 0: ( + "OK", + "System-wide code for ok.", + "RETURN_OK", + "HasReturnvaluesIF.h", + "HasReturnvaluesIF", + ) + } + ) + self.return_value_dict.update( + { + 1: ( + "Failed", + "Unspecified system-wide code for failed.", + "RETURN_FAILED", + "HasReturnvaluesIF.h", + "HasReturnvaluesIF", + ) + } + ) def _handle_file_parsing(self, file_name: str, *args, **kwargs): """Former way to parse returnvalues. Not recommended anymore. @@ -183,21 +212,28 @@ class ReturnValueParser(FileParser): self.__handle_line_reading(line, file_name, print_truncated_entries) def _handle_file_parsing_moving_window( - self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, - *args, **kwargs + self, + file_name: str, + current_line: int, + moving_window_size: int, + moving_window: list, + *args, + **kwargs, ): """Parse for returnvalues using a moving window""" interface_id_match = re.search( - rf'{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)', moving_window[self._moving_window_center_idx] + rf"{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)", + moving_window[self._moving_window_center_idx], ) - + if interface_id_match: self.__handle_interfaceid_match( interface_id_match=interface_id_match, file_name=file_name ) returnvalue_match = re.search( r"^[\s]*static const(?:expr)?[\s]*ReturnValue_t[\s]*([\w]*)[\s]*=[\s]*((?!;).*$)", - moving_window[self._moving_window_center_idx], re.DOTALL + moving_window[self._moving_window_center_idx], + re.DOTALL, ) full_returnvalue_string = "" if returnvalue_match: @@ -206,7 +242,7 @@ class ReturnValueParser(FileParser): else: full_returnvalue_string = self.__build_multi_line_returnvalue_string( moving_window=moving_window, - first_line=moving_window[self._moving_window_center_idx] + first_line=moving_window[self._moving_window_center_idx], ) number_match = INVALID_IF_ID # Try to match for a string using the new API first. Example: @@ -215,14 +251,14 @@ class ReturnValueParser(FileParser): returnvalue_match = re.search( r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*" r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)", - full_returnvalue_string + full_returnvalue_string, ) if not returnvalue_match: # Try to match for old API using MAE_RETURN_CODE macro returnvalue_match = re.search( - r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*' - r'MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)', - full_returnvalue_string + r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*" + r"MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)", + full_returnvalue_string, ) if returnvalue_match: number_match = returnvalue_match.group(2) @@ -231,17 +267,19 @@ class ReturnValueParser(FileParser): if returnvalue_match: description = self.__search_for_descrip_string(moving_window=moving_window) if number_match == INVALID_IF_ID: - LOGGER.warning(f'Invalid number match detected for file {file_name}') - LOGGER.warning(f'Match groups:') + LOGGER.warning(f"Invalid number match detected for file {file_name}") + LOGGER.warning(f"Match groups:") for group in returnvalue_match.groups(): LOGGER.info(group) self.__handle_returnvalue_match( - name_match=returnvalue_match.group(1), file_name=file_name, - number_match=number_match, description=description + name_match=returnvalue_match.group(1), + file_name=file_name, + number_match=number_match, + description=description, ) def __build_multi_line_returnvalue_string( - self, first_line: str, moving_window: List[str] + self, first_line: str, moving_window: List[str] ) -> str: return self._build_multi_line_string_generic( first_line=first_line, moving_window=moving_window @@ -249,29 +287,33 @@ class ReturnValueParser(FileParser): def __search_for_descrip_string(self, moving_window: List[str]) -> str: return self._search_for_descrip_string_generic( - moving_window=moving_window, break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t" + moving_window=moving_window, + break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t", ) - + def __handle_line_reading(self, line, file_name, print_truncated_entries: bool): newline = line - if self.last_lines[0] != '\n': - two_lines = self.last_lines[0] + ' ' + newline.strip() + if self.last_lines[0] != "\n": + two_lines = self.last_lines[0] + " " + newline.strip() else: - two_lines = '' - interface_id_match = re.search(r'INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)', - two_lines) + two_lines = "" + interface_id_match = re.search( + r"INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)", two_lines + ) if interface_id_match: self.__handle_interfaceid_match(interface_id_match, file_name=file_name) returnvalue_match = re.search( - r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*' - r'MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)', - two_lines + r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*" + r"MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)", + two_lines, ) if returnvalue_match: self.__handle_returnvalue_match( - name_match=returnvalue_match.group(1), file_name=file_name, description="", - number_match=returnvalue_match.group(2) + name_match=returnvalue_match.group(1), + file_name=file_name, + description="", + number_match=returnvalue_match.group(2), ) self.last_lines[1] = self.last_lines[0] self.last_lines[0] = newline @@ -281,43 +323,50 @@ class ReturnValueParser(FileParser): Returns whether the interface ID was found successfully in the IF ID header files """ if self.get_verbosity() == VerbosityLevels.DEBUG: - LOGGER.info(f'Interface ID {interface_id_match.group(1)} found in {file_name}') + LOGGER.info( + f"Interface ID {interface_id_match.group(1)} found in {file_name}" + ) if_id_entry = self.interfaces.get(interface_id_match.group(1)) if if_id_entry is not None: self.current_interface_id_entries["ID"] = if_id_entry[0] else: LOGGER.warning( - f'Interface ID {interface_id_match.group(1)} not found in IF ID dictionary' + f"Interface ID {interface_id_match.group(1)} not found in IF ID dictionary" ) return False - self.current_interface_id_entries["Name"] = \ - self.interfaces[interface_id_match.group(1)][1] + self.current_interface_id_entries["Name"] = self.interfaces[ + interface_id_match.group(1) + ][1] self.current_interface_id_entries["FullName"] = interface_id_match.group(1) if self.get_verbosity() == VerbosityLevels.DEBUG: current_id = self.current_interface_id_entries["ID"] - LOGGER.info(f'Current ID: {current_id}') + LOGGER.info(f"Current ID: {current_id}") return True def __handle_returnvalue_match( - self, name_match: str, number_match: str, file_name: str, description: str + self, name_match: str, number_match: str, file_name: str, description: str ): string_to_add = self.build_checked_string( - self.current_interface_id_entries["Name"], name_match, MAX_STRING_LEN, - PRINT_TRUNCATED_ENTRIES + self.current_interface_id_entries["Name"], + name_match, + MAX_STRING_LEN, + PRINT_TRUNCATED_ENTRIES, ) - full_id = (self.current_interface_id_entries["ID"] << 8) + \ - return_number_from_string(number_match) + full_id = ( + self.current_interface_id_entries["ID"] << 8 + ) + return_number_from_string(number_match) if full_id in self.return_value_dict: # print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name + # ' was already in ' + self.return_value_dict[full_id][3]) pass dict_tuple = ( - string_to_add, description, number_match, file_name, - self.current_interface_id_entries["FullName"] + string_to_add, + description, + number_match, + file_name, + self.current_interface_id_entries["FullName"], ) - self.return_value_dict.update({ - full_id: dict_tuple - }) + self.return_value_dict.update({full_id: dict_tuple}) self.count = self.count + 1 def _post_parsing_operation(self): @@ -330,19 +379,33 @@ class ReturnValueParser(FileParser): file = open(filename, "w") for entry in list_of_entries.items(): file.write( - hex(entry[0]) + file_separator + entry[1][0] + file_separator + entry[1][1] + - file_separator + entry[1][2] + file_separator + entry[1][3] + file_separator + - entry[1][4] + '\n' + hex(entry[0]) + + file_separator + + entry[1][0] + + file_separator + + entry[1][1] + + file_separator + + entry[1][2] + + file_separator + + entry[1][3] + + file_separator + + entry[1][4] + + "\n" ) file.close() - def build_checked_string(self, first_part, second_part, max_string_len: int, - print_truncated_entries: bool): - """ Build a checked string """ - my_str = first_part + '_' + self.convert(second_part) + def build_checked_string( + self, + first_part, + second_part, + max_string_len: int, + print_truncated_entries: bool, + ): + """Build a checked string""" + my_str = first_part + "_" + self.convert(second_part) if len(my_str) > max_string_len: if print_truncated_entries: - LOGGER.warning(f'Entry {my_str} too long. Will truncate.') + LOGGER.warning(f"Entry {my_str} too long. Will truncate.") my_str = my_str[0:max_string_len] else: # print("Entry: " + myStr + " is all right.") @@ -351,8 +414,8 @@ class ReturnValueParser(FileParser): @staticmethod def convert(name): - single_strings = name.split('_') - new_string = '' + single_strings = name.split("_") + new_string = "" for one_string in single_strings: one_string = one_string.lower() one_string = one_string.capitalize() @@ -361,17 +424,16 @@ class ReturnValueParser(FileParser): @staticmethod def clean_up_description(descr_string): - description = descr_string.lstrip('!<- ') - if description == '': - description = ' ' + description = descr_string.lstrip("!<- ") + if description == "": + description = " " return description def return_number_from_string(a_string): - if a_string.startswith('0x'): + if a_string.startswith("0x"): return int(a_string, 16) if a_string.isdigit(): return int(a_string) - LOGGER.warning(f'Illegal number representation: {a_string}') + LOGGER.warning(f"Illegal number representation: {a_string}") return 0 - diff --git a/utility/csv_writer.py b/utility/csv_writer.py index 9095c42..d295807 100644 --- a/utility/csv_writer.py +++ b/utility/csv_writer.py @@ -4,7 +4,11 @@ from fsfwgen.utility.file_management import copy_file, move_file # TODO: Export to SQL class CsvWriter: def __init__( - self, filename: str, table_to_print=None, header_array=None, file_separator: str = "," + self, + filename: str, + table_to_print=None, + header_array=None, + file_separator: str = ",", ): if header_array is None: header_array = [] @@ -22,7 +26,7 @@ class CsvWriter: file.write("Index" + self.file_separator) for index in range(self.column_numbers): # noinspection PyTypeChecker - if index < len(self.header_array)-1: + if index < len(self.header_array) - 1: file.write(self.header_array[index] + self.file_separator) else: file.write(self.header_array[index] + "\n") diff --git a/utility/file_management.py b/utility/file_management.py index ebf74cf..f15fb2e 100644 --- a/utility/file_management.py +++ b/utility/file_management.py @@ -2,36 +2,37 @@ import shutil import os from fsfwgen.core import get_console_logger + LOGGER = get_console_logger() def copy_file(filename: str, destination: str = "", delete_existing_file: bool = False): if not os.path.exists(filename): - LOGGER.warning(f'File {filename} does not exist') + LOGGER.warning(f"File {filename} does not exist") return if not os.path.exists(destination): - LOGGER.warning(f'Destination directory {destination} does not exist') + LOGGER.warning(f"Destination directory {destination} does not exist") return try: shutil.copy2(filename, destination) except FileNotFoundError: - LOGGER.exception('File not found!') + LOGGER.exception("File not found!") except shutil.SameFileError: - LOGGER.exception('Source and destination are the same!') + LOGGER.exception("Source and destination are the same!") def move_file(file_name: str, destination: str = ""): if not os.path.exists(file_name): - print(f'move_file: File {file_name} does not exist') + print(f"move_file: File {file_name} does not exist") return if not os.path.exists(destination): - print(f'move_file: Destination directory {destination} does not exist') + print(f"move_file: Destination directory {destination} does not exist") return try: shutil.copy2(file_name, destination) os.remove(file_name) return except FileNotFoundError: - LOGGER.exception('File not found!') + LOGGER.exception("File not found!") except shutil.SameFileError: - LOGGER.exception('Source and destination are the same!') + LOGGER.exception("Source and destination are the same!") diff --git a/utility/printer.py b/utility/printer.py index 9ff82f4..23a17d1 100644 --- a/utility/printer.py +++ b/utility/printer.py @@ -13,4 +13,3 @@ class Printer: print(leading_string) PrettyPrinter.pprint(dictionary) print("\r\n", end="") - diff --git a/utility/sql_writer.py b/utility/sql_writer.py index 647740c..c232eb2 100644 --- a/utility/sql_writer.py +++ b/utility/sql_writer.py @@ -11,7 +11,7 @@ class SqlWriter: self.conn = sqlite3.connect(self.filename) def open(self, sql_creation_command: str): - LOGGER.info(f'SQL Writer: Opening {self.filename}') + LOGGER.info(f"SQL Writer: Opening {self.filename}") self.conn.execute(sql_creation_command) def delete(self, sql_deletion_command): @@ -31,7 +31,7 @@ class SqlWriter: self.conn.close() def sql_writing_helper( - self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str = "" + self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str = "" ): if deletion_cmd != "": self.delete(deletion_cmd)