more improvements and fixes for fsfwgen

This commit is contained in:
Robin Müller 2022-06-21 00:51:13 +02:00
parent 36b44d1e26
commit 1b1ac86e8c
No known key found for this signature in database
GPG Key ID: 71B58F8A3CDFA9AC
9 changed files with 159 additions and 169 deletions

View File

@ -1,73 +1,7 @@
import enum import enum
import sys
import colorlog
import logging
import argparse import argparse
CONSOLE_LOGGER_NAME = "FSFW Generator Logger"
LOGGER_INSTANCE = None
class InfoFilter(logging.Filter):
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.INFO:
return rec.levelno
return None
class DebugFilter(logging.Filter):
"""Filter object, which is used so that only DEBUG messages are printed to stdout."""
def filter(self, rec):
if rec.levelno == logging.DEBUG:
return rec.levelno
return None
def get_console_logger():
global LOGGER_INSTANCE
if LOGGER_INSTANCE is None:
LOGGER_INSTANCE = init_console_logger()
return LOGGER_INSTANCE
def init_console_logger():
logger = colorlog.getLogger(CONSOLE_LOGGER_NAME)
generic_format = colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s"
)
fault_format = colorlog.ColoredFormatter(
fmt="%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d "
"[%(filename)s:%(lineno)d] %(reset)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
console_info_handler = colorlog.StreamHandler(stream=sys.stdout)
console_info_handler.setLevel(logging.INFO)
console_info_handler.addFilter(InfoFilter())
console_debug_handler = logging.StreamHandler(stream=sys.stdout)
console_debug_handler.setLevel(logging.DEBUG)
console_debug_handler.addFilter(DebugFilter())
console_info_handler.setFormatter(generic_format)
console_info_handler.addFilter(InfoFilter())
console_debug_handler.addFilter(DebugFilter())
console_error_handler = logging.StreamHandler(stream=sys.stderr)
console_error_handler.setLevel(logging.WARNING)
console_error_handler.setFormatter(fault_format)
logger.addHandler(console_info_handler)
logger.addHandler(console_debug_handler)
logger.addHandler(console_error_handler)
logger.setLevel(logging.DEBUG)
return logger
class ParserTypes(enum.Enum): class ParserTypes(enum.Enum):
EVENTS = "events" EVENTS = "events"
OBJECTS = "objects" OBJECTS = "objects"
@ -76,8 +10,6 @@ class ParserTypes(enum.Enum):
def init_printout(project_string: str): def init_printout(project_string: str):
global LOGGER_INSTANCE
LOGGER_INSTANCE = get_console_logger()
print(f"-- {project_string} MIB Generator --") print(f"-- {project_string} MIB Generator --")

View File

@ -1,10 +1,11 @@
import re import re
import os import os
import logging
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple from typing import List, Optional, Tuple, Dict
from fsfwgen.parserbase.parser import FileParser from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -50,31 +51,40 @@ class SubsystemDefinitionParser(FileParser):
pass pass
class EventEntry:
def __init__(self, name: str, severity: str, description: str, file_name: Path):
self.name = name
self.severity = severity
self.description = description
self.file_name = file_name
def __repr__(self):
return (
f"EventEntry(name={self.name!r}, severity={self.severity!r}, "
f"description={self.description!r}, file_name={self.file_name!r}"
)
EventDictT = Dict[int, EventEntry]
class EventParser(FileParser): class EventParser(FileParser):
def __init__(self, file_list: List[Path], interface_list): def __init__(
self, file_list: List[Path], interface_list, moving_window_size: int = 7
):
super().__init__(file_list) super().__init__(file_list)
self.set_moving_window_mode(moving_window_size)
self.interfaces = interface_list self.interfaces = interface_list
self.count = 0 self.count = 0
self.my_id = 0 self.my_id = 0
self.current_id = 0 self.current_id = 0
self.mib_table: EventDictT = dict()
self.obsw_root_path: Optional[Path] = None self.obsw_root_path: Optional[Path] = None
self.last_lines = ["", "", ""] self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3 self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs): def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs):
try: logging.warning("Regular file parsing mode not implemented")
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
total_count = 0
for line in all_lines:
self.__handle_line_reading(line, file_name)
if self.count > 0:
print(f"File {file_name} contained {self.count} events.")
total_count += self.count
self.count = 0
def _handle_file_parsing_moving_window( def _handle_file_parsing_moving_window(
self, self,
@ -159,14 +169,16 @@ class EventParser(FileParser):
) )
severity = event_full_match.group(4) severity = event_full_match.group(4)
if self.obsw_root_path is not None: if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path) file_name = file_name.relative_to(self.obsw_root_path)
if self.mib_table.get(full_id) is not None: if self.mib_table.get(full_id) is not None:
LOGGER.warning(f"Duplicate event ID {full_id} detected") LOGGER.warning(f"Duplicate event ID {full_id} detected")
LOGGER.info( LOGGER.info(
f"Name: {self.mib_table.get(full_id)[0]}| " f"Name: {self.mib_table.get(full_id).name}| "
f"Description: {self.mib_table.get(full_id)[2]}" f"Description: {self.mib_table.get(full_id).description}"
) )
self.mib_table.update({full_id: (name, severity, description, file_name)}) self.mib_table.update(
{full_id: EventEntry(name, severity, description, file_name)}
)
self.count = self.count + 1 self.count = self.count + 1
@staticmethod @staticmethod
@ -195,54 +207,11 @@ class EventParser(FileParser):
def _post_parsing_operation(self): def _post_parsing_operation(self):
pass pass
def __handle_line_reading(self, line, file_name: Path):
if not self.last_lines[0] == "\n":
twolines = self.last_lines[0] + " " + line.strip()
else:
twolines = ""
match1 = re.search(
r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines
)
if match1:
self.current_id = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.my_id = self.return_number_from_string(self.current_id)
match = re.search(
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)",
twolines,
)
if match:
if match.group(1):
self.last_lines[0] = line
return
description = " "
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.my_id * 100) + self.return_number_from_string(
match.group(3)
)
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = file_name.relative_to(self.obsw_root_path)
self.mib_table.update(
{full_id: (string_to_add, severity, description, file_name)}
)
self.count = self.count + 1
self.last_lines[0] = line
def build_checked_string(self, first_part, second_part): def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part) my_str = first_part + self.convert(second_part)
if len(my_str) > 16: if len(my_str) > 16:
print(f"EventParser: Entry: {my_str} too long. Will truncate.") print(f"EventParser: Entry: {my_str} too long. Will truncate.")
my_str = my_str[0:14] my_str = my_str[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return my_str return my_str
@staticmethod @staticmethod
@ -274,46 +243,37 @@ class EventParser(FileParser):
return description return description
def export_to_file( def export_to_file(filename: Path, event_list: EventDictT, file_separator: str):
filename: Path, event_list: List[Tuple[str, List]], file_separator: str
):
file = open(filename, "w") file = open(filename, "w")
fsep = file_separator fsep = file_separator
for entry in event_list: for entry in event_list.items():
event_id = int(entry[0]) event_id = int(entry[0])
event_value = entry[1] event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}" event_id_as_hex = f"{event_id:#06x}"
file.write( file.write(
f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value[EVENT_ENTRY_NAME_IDX]}{fsep}" f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value.name}{fsep}"
f"{event_value[EVENT_ENTRY_SEVERITY_IDX]}{fsep}{event_value[EVENT_ENTRY_INFO_IDX]}" f"{event_value.severity}{fsep}{event_value.description}"
f"{fsep}{event_value[EVENT_SOURCE_FILE_IDX].as_posix()}\n" f"{fsep}{event_value.file_name.as_posix()}\n"
) )
file.close() file.close()
def write_translation_source_file( def write_translation_source_file(
event_list: list, date_string: str, filename: Path = "translateEvents.cpp" event_list: EventDictT, date_string: str, filename: Path = "translateEvents.cpp"
): ):
with open(filename, "w") as out: with open(filename, "w") as out:
definitions = "" definitions = ""
# Look up table to avoid duplicate events # Look up table to avoid duplicate events
lut = dict() lut = dict()
function = ( function = "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
"const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n" for entry in event_list.items():
)
for entry in event_list:
event_id = entry[0] event_id = entry[0]
event_value = entry[1] event_value = entry[1]
if event_value[EVENT_ENTRY_NAME_IDX] not in lut: name = event_value.name
definitions += ( if name not in lut:
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING " definitions += f"const char *{name}_STRING " f'= "{name}";\n'
f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n' function += f" case ({event_id}):\n " f"return {name}_STRING;\n"
) lut.update({name: event_value})
function += (
f" case ({event_id}):\n "
f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
)
lut.update({event_value[EVENT_ENTRY_NAME_IDX]: event_value})
function += ' default:\n return "UNKNOWN_EVENT";\n' function += ' default:\n return "UNKNOWN_EVENT";\n'
out.write( out.write(
f"/**\n * @brief Auto-generated event translation file. " f"/**\n * @brief Auto-generated event translation file. "

97
fsfwgen/logging.py Normal file
View File

@ -0,0 +1,97 @@
import logging
import sys
from typing import Optional
from colorlog import ColoredFormatter
FSFWGEN_LOGGER_NAME = "fsfwgen"
__CONSOLE_LOGGER_SET_UP: Optional[logging.Logger] = None
def get_console_logger() -> logging.Logger:
global __CONSOLE_LOGGER_SET_UP
"""Get the global console logger instance. Error logs will still be saved to an error file
"""
logger = logging.getLogger(FSFWGEN_LOGGER_NAME)
if not __CONSOLE_LOGGER_SET_UP:
__CONSOLE_LOGGER_SET_UP = True
__setup_tmtc_console_logger()
return logger
def init_console_logger(log_level: int = logging.INFO) -> logging.Logger:
global __CONSOLE_LOGGER_SET_UP
if not __CONSOLE_LOGGER_SET_UP:
__CONSOLE_LOGGER_SET_UP = True
return __setup_tmtc_console_logger(log_level=log_level)
return get_console_logger()
def __setup_tmtc_console_logger(log_level: int = logging.INFO) -> logging.Logger:
"""Sets the LOGGER object which will be used globally. This needs to be called before
using the logger.
:return: Returns the instance of the global logger
"""
logger = logging.getLogger(FSFWGEN_LOGGER_NAME)
# Use colorlog for now because it allows more flexibility and custom messages
# for different levels
set_up_colorlog_logger(logger=logger)
logger.setLevel(level=log_level)
# set_up_coloredlogs_logger(logger=logger)
return logger
# Custom formatter. Allows different strings for info, error and debug output
class CustomTmtccmdFormatter(ColoredFormatter):
def __init__(
self, info_fmt: str, dbg_fmt: str, err_fmt: str, warn_fmt: str, datefmt=None
):
self.err_fmt = err_fmt
self.info_fmt = info_fmt
self.dbg_fmt = dbg_fmt
self.warn_fmt = warn_fmt
super().__init__(fmt="%(levelno)d: %(msg)s", datefmt=datefmt, style="%")
def format(self, record):
# Save the original format configured by the user
# when the logger formatter was instantiated
format_orig = self._style._fmt
# Replace the original format with one customized by logging level
if record.levelno == logging.DEBUG:
self._style._fmt = self.dbg_fmt
elif record.levelno == logging.INFO:
self._style._fmt = self.info_fmt
elif record.levelno == logging.ERROR:
self._style._fmt = self.err_fmt
elif record.levelno == logging.WARNING:
self._style._fmt = self.warn_fmt
# Call the original formatter class to do the grunt work
result = logging.Formatter.format(self, record)
# Restore the original format configured by the user
self._style._fmt = format_orig
return result
def set_up_colorlog_logger(logger: logging.Logger):
from colorlog import StreamHandler
dbg_fmt = "%(log_color)s%(levelname)-8s %(cyan)s [%(filename)s:%(lineno)d] %(reset)s%(message)s"
custom_formatter = CustomTmtccmdFormatter(
info_fmt="%(log_color)s%(levelname)-8s %(cyan)s %(reset)s%(message)s",
dbg_fmt=dbg_fmt,
err_fmt=dbg_fmt,
warn_fmt=dbg_fmt,
)
console_handler = StreamHandler(stream=sys.stdout)
console_handler.setFormatter(custom_formatter)
logger.addHandler(console_handler)
logger.propagate = False

View File

@ -3,7 +3,7 @@ from pathlib import Path
from typing import List from typing import List
from fsfwgen.parserbase.parser import FileParser from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger from fsfwgen.logging import get_console_logger
from fsfwgen.utility.sql_writer import SqlWriter from fsfwgen.utility.sql_writer import SqlWriter

View File

@ -6,7 +6,7 @@ import re
from pathlib import Path from pathlib import Path
from typing import Union, List from typing import Union, List
from fsfwgen.core import get_console_logger from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger() LOGGER = get_console_logger()

View File

@ -16,6 +16,7 @@ import re
from abc import abstractmethod from abc import abstractmethod
from pathlib import Path from pathlib import Path
from typing import Dict, List from typing import Dict, List
from enum import Enum, auto
class VerbosityLevels(enum.Enum): class VerbosityLevels(enum.Enum):
@ -24,9 +25,9 @@ class VerbosityLevels(enum.Enum):
DEBUG = 2 DEBUG = 2
class FileParserModes(enum.Enum): class FileParserModes(Enum):
REGULAR = enum.auto REGULAR = auto()
MOVING_WINDOW = enum.auto MOVING_WINDOW = auto()
class FileParser: class FileParser:
@ -262,9 +263,7 @@ class FileParser:
descrip_match = re.search( descrip_match = re.search(
r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx] r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx]
) )
if descrip_match: if descrip_match or current_idx <= 0:
break
if current_idx <= 0:
break break
current_idx -= 1 current_idx -= 1
if descrip_match: if descrip_match:

View File

@ -1,4 +1,3 @@
import os.path
import re import re
import sys import sys
from pathlib import Path from pathlib import Path
@ -6,7 +5,7 @@ from typing import List, Tuple, Optional, Dict
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels from fsfwgen.parserbase.parser import FileParser, VerbosityLevels
from fsfwgen.utility.printer import PrettyPrinter from fsfwgen.utility.printer import PrettyPrinter
from fsfwgen.core import get_console_logger from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger() LOGGER = get_console_logger()
@ -178,8 +177,9 @@ class RetvalEntry:
def __repr__(self): def __repr__(self):
return ( return (
f"RetvalEntry(name={self.name!r}, description={self.description!r}, unique_id={self.unique_id!r}, " f"RetvalEntry(name={self.name!r}, description={self.description!r}, "
f"file_name={self.file_name!r}, subsystem_name={self.subsystem_name!r})" f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, "
f"subsystem_name={self.subsystem_name!r})"
) )
@ -405,7 +405,7 @@ class ReturnValueParser(FileParser):
self.mib_table = self.return_value_dict self.mib_table = self.return_value_dict
@staticmethod @staticmethod
def export_to_file(filename: str, list_of_entries: RetvalDictT, file_sep: str): def export_to_file(filename: Path, list_of_entries: RetvalDictT, file_sep: str):
file = open(filename, "w") file = open(filename, "w")
for entry in list_of_entries.items(): for entry in list_of_entries.items():
file.write( file.write(

View File

@ -3,12 +3,14 @@ import shutil
import os import os
from pathlib import Path from pathlib import Path
from fsfwgen.core import get_console_logger from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger() LOGGER = get_console_logger()
def copy_file(filename: Path, destination: Path = "", delete_existing_file: bool = False): def copy_file(
filename: Path, destination: Path = "", delete_existing_file: bool = False
):
if not os.path.exists(filename): if not os.path.exists(filename):
LOGGER.warning(f"File {filename} does not exist") LOGGER.warning(f"File {filename} does not exist")
return return

View File

@ -1,6 +1,6 @@
import sqlite3 import sqlite3
from fsfwgen.core import get_console_logger from fsfwgen.logging import get_console_logger
LOGGER = get_console_logger() LOGGER = get_console_logger()