From 1b1ac86e8c9353b8dba0f610c26a13583d02f6db Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 21 Jun 2022 00:51:13 +0200 Subject: [PATCH] more improvements and fixes for fsfwgen --- fsfwgen/core.py | 68 ---------- fsfwgen/events/event_parser.py | 130 +++++++------------- fsfwgen/logging.py | 97 +++++++++++++++ fsfwgen/objects/objects.py | 2 +- fsfwgen/parserbase/file_list_parser.py | 2 +- fsfwgen/parserbase/parser.py | 11 +- fsfwgen/returnvalues/returnvalues_parser.py | 10 +- fsfwgen/utility/file_management.py | 6 +- fsfwgen/utility/sql_writer.py | 2 +- 9 files changed, 159 insertions(+), 169 deletions(-) create mode 100644 fsfwgen/logging.py diff --git a/fsfwgen/core.py b/fsfwgen/core.py index e1d1438..19c5a90 100644 --- a/fsfwgen/core.py +++ b/fsfwgen/core.py @@ -1,73 +1,7 @@ import enum -import sys - -import colorlog -import logging import argparse -CONSOLE_LOGGER_NAME = "FSFW Generator Logger" -LOGGER_INSTANCE = None - - -class InfoFilter(logging.Filter): - """Filter object, which is used so that only INFO and DEBUG messages are printed to stdout.""" - - def filter(self, rec): - if rec.levelno == logging.INFO: - return rec.levelno - return None - - -class DebugFilter(logging.Filter): - """Filter object, which is used so that only DEBUG messages are printed to stdout.""" - - def filter(self, rec): - if rec.levelno == logging.DEBUG: - return rec.levelno - return None - - -def get_console_logger(): - global LOGGER_INSTANCE - if LOGGER_INSTANCE is None: - LOGGER_INSTANCE = init_console_logger() - return LOGGER_INSTANCE - - -def init_console_logger(): - logger = colorlog.getLogger(CONSOLE_LOGGER_NAME) - generic_format = colorlog.ColoredFormatter( - "%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s" - ) - fault_format = colorlog.ColoredFormatter( - fmt="%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d " - "[%(filename)s:%(lineno)d] %(reset)s%(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - console_info_handler = colorlog.StreamHandler(stream=sys.stdout) - console_info_handler.setLevel(logging.INFO) - console_info_handler.addFilter(InfoFilter()) - - console_debug_handler = logging.StreamHandler(stream=sys.stdout) - console_debug_handler.setLevel(logging.DEBUG) - console_debug_handler.addFilter(DebugFilter()) - - console_info_handler.setFormatter(generic_format) - console_info_handler.addFilter(InfoFilter()) - console_debug_handler.addFilter(DebugFilter()) - console_error_handler = logging.StreamHandler(stream=sys.stderr) - console_error_handler.setLevel(logging.WARNING) - console_error_handler.setFormatter(fault_format) - - logger.addHandler(console_info_handler) - logger.addHandler(console_debug_handler) - logger.addHandler(console_error_handler) - logger.setLevel(logging.DEBUG) - return logger - - class ParserTypes(enum.Enum): EVENTS = "events" OBJECTS = "objects" @@ -76,8 +10,6 @@ class ParserTypes(enum.Enum): def init_printout(project_string: str): - global LOGGER_INSTANCE - LOGGER_INSTANCE = get_console_logger() print(f"-- {project_string} MIB Generator --") diff --git a/fsfwgen/events/event_parser.py b/fsfwgen/events/event_parser.py index 47751d7..eca5251 100644 --- a/fsfwgen/events/event_parser.py +++ b/fsfwgen/events/event_parser.py @@ -1,10 +1,11 @@ import re import os +import logging from pathlib import Path -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Dict from fsfwgen.parserbase.parser import FileParser -from fsfwgen.core import get_console_logger +from fsfwgen.logging import get_console_logger LOGGER = get_console_logger() @@ -50,31 +51,40 @@ class SubsystemDefinitionParser(FileParser): pass +class EventEntry: + def __init__(self, name: str, severity: str, description: str, file_name: Path): + self.name = name + self.severity = severity + self.description = description + self.file_name = file_name + + def __repr__(self): + return ( + f"EventEntry(name={self.name!r}, severity={self.severity!r}, " + f"description={self.description!r}, file_name={self.file_name!r}" + ) + + +EventDictT = Dict[int, EventEntry] + + class EventParser(FileParser): - def __init__(self, file_list: List[Path], interface_list): + def __init__( + self, file_list: List[Path], interface_list, moving_window_size: int = 7 + ): super().__init__(file_list) + self.set_moving_window_mode(moving_window_size) self.interfaces = interface_list self.count = 0 self.my_id = 0 self.current_id = 0 + self.mib_table: EventDictT = dict() self.obsw_root_path: Optional[Path] = None self.last_lines = ["", "", ""] self.moving_window_center_idx = 3 def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs): - try: - file = open(file_name, "r", encoding="utf-8") - all_lines = file.readlines() - except UnicodeDecodeError: - file = open(file_name, "r", encoding="cp1252") - all_lines = file.readlines() - total_count = 0 - for line in all_lines: - self.__handle_line_reading(line, file_name) - if self.count > 0: - print(f"File {file_name} contained {self.count} events.") - total_count += self.count - self.count = 0 + logging.warning("Regular file parsing mode not implemented") def _handle_file_parsing_moving_window( self, @@ -159,14 +169,16 @@ class EventParser(FileParser): ) severity = event_full_match.group(4) if self.obsw_root_path is not None: - file_name = os.path.relpath(file_name, self.obsw_root_path) + file_name = file_name.relative_to(self.obsw_root_path) if self.mib_table.get(full_id) is not None: LOGGER.warning(f"Duplicate event ID {full_id} detected") LOGGER.info( - f"Name: {self.mib_table.get(full_id)[0]}| " - f"Description: {self.mib_table.get(full_id)[2]}" + f"Name: {self.mib_table.get(full_id).name}| " + f"Description: {self.mib_table.get(full_id).description}" ) - self.mib_table.update({full_id: (name, severity, description, file_name)}) + self.mib_table.update( + {full_id: EventEntry(name, severity, description, file_name)} + ) self.count = self.count + 1 @staticmethod @@ -195,54 +207,11 @@ class EventParser(FileParser): def _post_parsing_operation(self): pass - def __handle_line_reading(self, line, file_name: Path): - if not self.last_lines[0] == "\n": - twolines = self.last_lines[0] + " " + line.strip() - else: - twolines = "" - match1 = re.search( - r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines - ) - if match1: - self.current_id = self.interfaces[match1.group(1)][0] - # print( "Current ID: " + str(currentId) ) - self.my_id = self.return_number_from_string(self.current_id) - match = re.search( - r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*" - r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", - twolines, - ) - if match: - if match.group(1): - self.last_lines[0] = line - return - description = " " - if match.group(6): - description = self.clean_up_description(match.group(6)) - string_to_add = match.group(2) - full_id = (self.my_id * 100) + self.return_number_from_string( - match.group(3) - ) - severity = match.group(4) - if full_id in self.mib_table: - # print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name + - # " was already in " + self.mib_table[full_id][3]) - pass - if self.obsw_root_path is not None: - file_name = file_name.relative_to(self.obsw_root_path) - self.mib_table.update( - {full_id: (string_to_add, severity, description, file_name)} - ) - self.count = self.count + 1 - self.last_lines[0] = line - def build_checked_string(self, first_part, second_part): my_str = first_part + self.convert(second_part) if len(my_str) > 16: print(f"EventParser: Entry: {my_str} too long. Will truncate.") my_str = my_str[0:14] - # else: - # print( "Entry: " + myStr + " is all right.") return my_str @staticmethod @@ -274,46 +243,37 @@ class EventParser(FileParser): return description -def export_to_file( - filename: Path, event_list: List[Tuple[str, List]], file_separator: str -): +def export_to_file(filename: Path, event_list: EventDictT, file_separator: str): file = open(filename, "w") fsep = file_separator - for entry in event_list: + for entry in event_list.items(): event_id = int(entry[0]) event_value = entry[1] event_id_as_hex = f"{event_id:#06x}" file.write( - f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value[EVENT_ENTRY_NAME_IDX]}{fsep}" - f"{event_value[EVENT_ENTRY_SEVERITY_IDX]}{fsep}{event_value[EVENT_ENTRY_INFO_IDX]}" - f"{fsep}{event_value[EVENT_SOURCE_FILE_IDX].as_posix()}\n" + f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value.name}{fsep}" + f"{event_value.severity}{fsep}{event_value.description}" + f"{fsep}{event_value.file_name.as_posix()}\n" ) file.close() def write_translation_source_file( - event_list: list, date_string: str, filename: Path = "translateEvents.cpp" + event_list: EventDictT, date_string: str, filename: Path = "translateEvents.cpp" ): with open(filename, "w") as out: definitions = "" # Look up table to avoid duplicate events lut = dict() - function = ( - "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n" - ) - for entry in event_list: + function = "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n" + for entry in event_list.items(): event_id = entry[0] event_value = entry[1] - if event_value[EVENT_ENTRY_NAME_IDX] not in lut: - definitions += ( - f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING " - f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n' - ) - function += ( - f" case ({event_id}):\n " - f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n" - ) - lut.update({event_value[EVENT_ENTRY_NAME_IDX]: event_value}) + name = event_value.name + if name not in lut: + definitions += f"const char *{name}_STRING " f'= "{name}";\n' + function += f" case ({event_id}):\n " f"return {name}_STRING;\n" + lut.update({name: event_value}) function += ' default:\n return "UNKNOWN_EVENT";\n' out.write( f"/**\n * @brief Auto-generated event translation file. " diff --git a/fsfwgen/logging.py b/fsfwgen/logging.py new file mode 100644 index 0000000..eef5406 --- /dev/null +++ b/fsfwgen/logging.py @@ -0,0 +1,97 @@ +import logging +import sys +from typing import Optional +from colorlog import ColoredFormatter + + +FSFWGEN_LOGGER_NAME = "fsfwgen" +__CONSOLE_LOGGER_SET_UP: Optional[logging.Logger] = None + + +def get_console_logger() -> logging.Logger: + global __CONSOLE_LOGGER_SET_UP + """Get the global console logger instance. Error logs will still be saved to an error file + """ + logger = logging.getLogger(FSFWGEN_LOGGER_NAME) + if not __CONSOLE_LOGGER_SET_UP: + __CONSOLE_LOGGER_SET_UP = True + __setup_tmtc_console_logger() + return logger + + +def init_console_logger(log_level: int = logging.INFO) -> logging.Logger: + global __CONSOLE_LOGGER_SET_UP + if not __CONSOLE_LOGGER_SET_UP: + __CONSOLE_LOGGER_SET_UP = True + return __setup_tmtc_console_logger(log_level=log_level) + return get_console_logger() + + +def __setup_tmtc_console_logger(log_level: int = logging.INFO) -> logging.Logger: + """Sets the LOGGER object which will be used globally. This needs to be called before + using the logger. + :return: Returns the instance of the global logger + """ + logger = logging.getLogger(FSFWGEN_LOGGER_NAME) + # Use colorlog for now because it allows more flexibility and custom messages + # for different levels + set_up_colorlog_logger(logger=logger) + logger.setLevel(level=log_level) + # set_up_coloredlogs_logger(logger=logger) + return logger + + +# Custom formatter. Allows different strings for info, error and debug output +class CustomTmtccmdFormatter(ColoredFormatter): + def __init__( + self, info_fmt: str, dbg_fmt: str, err_fmt: str, warn_fmt: str, datefmt=None + ): + self.err_fmt = err_fmt + self.info_fmt = info_fmt + self.dbg_fmt = dbg_fmt + self.warn_fmt = warn_fmt + super().__init__(fmt="%(levelno)d: %(msg)s", datefmt=datefmt, style="%") + + def format(self, record): + + # Save the original format configured by the user + # when the logger formatter was instantiated + format_orig = self._style._fmt + + # Replace the original format with one customized by logging level + if record.levelno == logging.DEBUG: + self._style._fmt = self.dbg_fmt + + elif record.levelno == logging.INFO: + self._style._fmt = self.info_fmt + + elif record.levelno == logging.ERROR: + self._style._fmt = self.err_fmt + + elif record.levelno == logging.WARNING: + self._style._fmt = self.warn_fmt + + # Call the original formatter class to do the grunt work + result = logging.Formatter.format(self, record) + + # Restore the original format configured by the user + self._style._fmt = format_orig + + return result + + +def set_up_colorlog_logger(logger: logging.Logger): + from colorlog import StreamHandler + + dbg_fmt = "%(log_color)s%(levelname)-8s %(cyan)s [%(filename)s:%(lineno)d] %(reset)s%(message)s" + custom_formatter = CustomTmtccmdFormatter( + info_fmt="%(log_color)s%(levelname)-8s %(cyan)s %(reset)s%(message)s", + dbg_fmt=dbg_fmt, + err_fmt=dbg_fmt, + warn_fmt=dbg_fmt, + ) + + console_handler = StreamHandler(stream=sys.stdout) + console_handler.setFormatter(custom_formatter) + logger.addHandler(console_handler) + logger.propagate = False diff --git a/fsfwgen/objects/objects.py b/fsfwgen/objects/objects.py index faddb78..58ef9cd 100644 --- a/fsfwgen/objects/objects.py +++ b/fsfwgen/objects/objects.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import List from fsfwgen.parserbase.parser import FileParser -from fsfwgen.core import get_console_logger +from fsfwgen.logging import get_console_logger from fsfwgen.utility.sql_writer import SqlWriter diff --git a/fsfwgen/parserbase/file_list_parser.py b/fsfwgen/parserbase/file_list_parser.py index 11ecdd2..2aae5fa 100644 --- a/fsfwgen/parserbase/file_list_parser.py +++ b/fsfwgen/parserbase/file_list_parser.py @@ -6,7 +6,7 @@ import re from pathlib import Path from typing import Union, List -from fsfwgen.core import get_console_logger +from fsfwgen.logging import get_console_logger LOGGER = get_console_logger() diff --git a/fsfwgen/parserbase/parser.py b/fsfwgen/parserbase/parser.py index 6f6c713..f112853 100644 --- a/fsfwgen/parserbase/parser.py +++ b/fsfwgen/parserbase/parser.py @@ -16,6 +16,7 @@ import re from abc import abstractmethod from pathlib import Path from typing import Dict, List +from enum import Enum, auto class VerbosityLevels(enum.Enum): @@ -24,9 +25,9 @@ class VerbosityLevels(enum.Enum): DEBUG = 2 -class FileParserModes(enum.Enum): - REGULAR = enum.auto - MOVING_WINDOW = enum.auto +class FileParserModes(Enum): + REGULAR = auto() + MOVING_WINDOW = auto() class FileParser: @@ -262,9 +263,7 @@ class FileParser: descrip_match = re.search( r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx] ) - if descrip_match: - break - if current_idx <= 0: + if descrip_match or current_idx <= 0: break current_idx -= 1 if descrip_match: diff --git a/fsfwgen/returnvalues/returnvalues_parser.py b/fsfwgen/returnvalues/returnvalues_parser.py index 9ce4570..be3258d 100644 --- a/fsfwgen/returnvalues/returnvalues_parser.py +++ b/fsfwgen/returnvalues/returnvalues_parser.py @@ -1,4 +1,3 @@ -import os.path import re import sys from pathlib import Path @@ -6,7 +5,7 @@ from typing import List, Tuple, Optional, Dict from fsfwgen.parserbase.parser import FileParser, VerbosityLevels from fsfwgen.utility.printer import PrettyPrinter -from fsfwgen.core import get_console_logger +from fsfwgen.logging import get_console_logger LOGGER = get_console_logger() @@ -178,8 +177,9 @@ class RetvalEntry: def __repr__(self): return ( - f"RetvalEntry(name={self.name!r}, description={self.description!r}, unique_id={self.unique_id!r}, " - f"file_name={self.file_name!r}, subsystem_name={self.subsystem_name!r})" + f"RetvalEntry(name={self.name!r}, description={self.description!r}, " + f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, " + f"subsystem_name={self.subsystem_name!r})" ) @@ -405,7 +405,7 @@ class ReturnValueParser(FileParser): self.mib_table = self.return_value_dict @staticmethod - def export_to_file(filename: str, list_of_entries: RetvalDictT, file_sep: str): + def export_to_file(filename: Path, list_of_entries: RetvalDictT, file_sep: str): file = open(filename, "w") for entry in list_of_entries.items(): file.write( diff --git a/fsfwgen/utility/file_management.py b/fsfwgen/utility/file_management.py index a24b293..7a695d5 100644 --- a/fsfwgen/utility/file_management.py +++ b/fsfwgen/utility/file_management.py @@ -3,12 +3,14 @@ import shutil import os from pathlib import Path -from fsfwgen.core import get_console_logger +from fsfwgen.logging import get_console_logger LOGGER = get_console_logger() -def copy_file(filename: Path, destination: Path = "", delete_existing_file: bool = False): +def copy_file( + filename: Path, destination: Path = "", delete_existing_file: bool = False +): if not os.path.exists(filename): LOGGER.warning(f"File {filename} does not exist") return diff --git a/fsfwgen/utility/sql_writer.py b/fsfwgen/utility/sql_writer.py index c232eb2..d6d3688 100644 --- a/fsfwgen/utility/sql_writer.py +++ b/fsfwgen/utility/sql_writer.py @@ -1,6 +1,6 @@ import sqlite3 -from fsfwgen.core import get_console_logger +from fsfwgen.logging import get_console_logger LOGGER = get_console_logger()