Compare commits
10 Commits
6d423f7106
...
b1e5a2d40a
Author | SHA1 | Date | |
---|---|---|---|
b1e5a2d40a | |||
e84be4bb17 | |||
fd9838bcba | |||
9c412ace74 | |||
911aa0d89d | |||
a5dee6e417 | |||
a2e0c4f98e | |||
1b1ac86e8c | |||
36b44d1e26 | |||
fc191cc50e |
@@ -1,73 +1,7 @@
|
||||
import enum
|
||||
import sys
|
||||
|
||||
import colorlog
|
||||
import logging
|
||||
import argparse
|
||||
|
||||
|
||||
CONSOLE_LOGGER_NAME = "FSFW Generator Logger"
|
||||
LOGGER_INSTANCE = None
|
||||
|
||||
|
||||
class InfoFilter(logging.Filter):
|
||||
"""Filter object, which is used so that only INFO and DEBUG messages are printed to stdout."""
|
||||
|
||||
def filter(self, rec):
|
||||
if rec.levelno == logging.INFO:
|
||||
return rec.levelno
|
||||
return None
|
||||
|
||||
|
||||
class DebugFilter(logging.Filter):
|
||||
"""Filter object, which is used so that only DEBUG messages are printed to stdout."""
|
||||
|
||||
def filter(self, rec):
|
||||
if rec.levelno == logging.DEBUG:
|
||||
return rec.levelno
|
||||
return None
|
||||
|
||||
|
||||
def get_console_logger():
|
||||
global LOGGER_INSTANCE
|
||||
if LOGGER_INSTANCE is None:
|
||||
LOGGER_INSTANCE = init_console_logger()
|
||||
return LOGGER_INSTANCE
|
||||
|
||||
|
||||
def init_console_logger():
|
||||
logger = colorlog.getLogger(CONSOLE_LOGGER_NAME)
|
||||
generic_format = colorlog.ColoredFormatter(
|
||||
"%(log_color)s%(levelname)-8s | %(reset)s%(message)s%(reset)s"
|
||||
)
|
||||
fault_format = colorlog.ColoredFormatter(
|
||||
fmt="%(log_color)s%(levelname)-8s %(cyan)s%(asctime)s.%(msecs)03d "
|
||||
"[%(filename)s:%(lineno)d] %(reset)s%(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
|
||||
console_info_handler = colorlog.StreamHandler(stream=sys.stdout)
|
||||
console_info_handler.setLevel(logging.INFO)
|
||||
console_info_handler.addFilter(InfoFilter())
|
||||
|
||||
console_debug_handler = logging.StreamHandler(stream=sys.stdout)
|
||||
console_debug_handler.setLevel(logging.DEBUG)
|
||||
console_debug_handler.addFilter(DebugFilter())
|
||||
|
||||
console_info_handler.setFormatter(generic_format)
|
||||
console_info_handler.addFilter(InfoFilter())
|
||||
console_debug_handler.addFilter(DebugFilter())
|
||||
console_error_handler = logging.StreamHandler(stream=sys.stderr)
|
||||
console_error_handler.setLevel(logging.WARNING)
|
||||
console_error_handler.setFormatter(fault_format)
|
||||
|
||||
logger.addHandler(console_info_handler)
|
||||
logger.addHandler(console_debug_handler)
|
||||
logger.addHandler(console_error_handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
return logger
|
||||
|
||||
|
||||
class ParserTypes(enum.Enum):
|
||||
EVENTS = "events"
|
||||
OBJECTS = "objects"
|
||||
@@ -76,8 +10,6 @@ class ParserTypes(enum.Enum):
|
||||
|
||||
|
||||
def init_printout(project_string: str):
|
||||
global LOGGER_INSTANCE
|
||||
LOGGER_INSTANCE = get_console_logger()
|
||||
print(f"-- {project_string} MIB Generator --")
|
||||
|
||||
|
||||
|
@@ -1,16 +1,14 @@
|
||||
import re
|
||||
import os
|
||||
from typing import List
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from fsfwgen.parserbase.parser import FileParser
|
||||
from fsfwgen.core import get_console_logger
|
||||
from fsfwgen.logging import get_console_logger
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
EVENT_ENTRY_NAME_IDX = 0
|
||||
EVENT_ENTRY_SEVERITY_IDX = 1
|
||||
EVENT_ENTRY_INFO_IDX = 2
|
||||
EVENT_SOURCE_FILE_IDX = 3
|
||||
|
||||
FSFW_EVENT_HEADER_INCLUDE = '#include "fsfw/events/Event.h"'
|
||||
DEFAULT_MOVING_WINDOWS_SIZE = 7
|
||||
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
|
||||
@@ -49,35 +47,44 @@ class SubsystemDefinitionParser(FileParser):
|
||||
pass
|
||||
|
||||
|
||||
class EventEntry:
|
||||
def __init__(self, name: str, severity: str, description: str, file_name: Path):
|
||||
self.name = name
|
||||
self.severity = severity
|
||||
self.description = description
|
||||
self.file_name = file_name
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"EventEntry(name={self.name!r}, severity={self.severity!r}, "
|
||||
f"description={self.description!r}, file_name={self.file_name!r}"
|
||||
)
|
||||
|
||||
|
||||
EventDictT = Dict[int, EventEntry]
|
||||
|
||||
|
||||
class EventParser(FileParser):
|
||||
def __init__(self, file_list: List[str], interface_list):
|
||||
def __init__(
|
||||
self, file_list: List[Path], interface_list, moving_window_size: int = 7
|
||||
):
|
||||
super().__init__(file_list)
|
||||
self.set_moving_window_mode(moving_window_size)
|
||||
self.interfaces = interface_list
|
||||
self.count = 0
|
||||
self.my_id = 0
|
||||
self.current_id = 0
|
||||
self.obsw_root_path = None
|
||||
self.mib_table: EventDictT = dict()
|
||||
self.obsw_root_path: Optional[Path] = None
|
||||
self.last_lines = ["", "", ""]
|
||||
self.moving_window_center_idx = 3
|
||||
|
||||
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
|
||||
try:
|
||||
file = open(file_name, "r", encoding="utf-8")
|
||||
all_lines = file.readlines()
|
||||
except UnicodeDecodeError:
|
||||
file = open(file_name, "r", encoding="cp1252")
|
||||
all_lines = file.readlines()
|
||||
total_count = 0
|
||||
for line in all_lines:
|
||||
self.__handle_line_reading(line, file_name)
|
||||
if self.count > 0:
|
||||
print("File " + file_name + " contained " + str(self.count) + " events.")
|
||||
total_count += self.count
|
||||
self.count = 0
|
||||
def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs):
|
||||
logging.warning("Regular file parsing mode not implemented")
|
||||
|
||||
def _handle_file_parsing_moving_window(
|
||||
self,
|
||||
file_name: str,
|
||||
file_name: Path,
|
||||
current_line: int,
|
||||
moving_window_size: int,
|
||||
moving_window: list,
|
||||
@@ -127,7 +134,7 @@ class EventParser(FileParser):
|
||||
)
|
||||
|
||||
def __handle_event_match(
|
||||
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
|
||||
self, event_match, macro_api_match: bool, moving_window: list, file_name: Path
|
||||
):
|
||||
if ";" in event_match.group(0):
|
||||
event_full_match = self.__generate_regex_event_match(
|
||||
@@ -158,14 +165,16 @@ class EventParser(FileParser):
|
||||
)
|
||||
severity = event_full_match.group(4)
|
||||
if self.obsw_root_path is not None:
|
||||
file_name = os.path.relpath(file_name, self.obsw_root_path)
|
||||
file_name = file_name.relative_to(self.obsw_root_path)
|
||||
if self.mib_table.get(full_id) is not None:
|
||||
LOGGER.warning(f"Duplicate event ID {full_id} detected")
|
||||
LOGGER.info(
|
||||
f"Name: {self.mib_table.get(full_id)[0]}| "
|
||||
f"Description: {self.mib_table.get(full_id)[2]}"
|
||||
f"Name: {self.mib_table.get(full_id).name}| "
|
||||
f"Description: {self.mib_table.get(full_id).description}"
|
||||
)
|
||||
self.mib_table.update({full_id: (name, severity, description, file_name)})
|
||||
self.mib_table.update(
|
||||
{full_id: EventEntry(name, severity, description, file_name)}
|
||||
)
|
||||
self.count = self.count + 1
|
||||
|
||||
@staticmethod
|
||||
@@ -194,56 +203,11 @@ class EventParser(FileParser):
|
||||
def _post_parsing_operation(self):
|
||||
pass
|
||||
|
||||
def __handle_line_reading(self, line, file_name: str):
|
||||
if not self.last_lines[0] == "\n":
|
||||
twolines = self.last_lines[0] + " " + line.strip()
|
||||
else:
|
||||
twolines = ""
|
||||
match1 = re.search(
|
||||
r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines
|
||||
)
|
||||
if match1:
|
||||
self.current_id = self.interfaces[match1.group(1)][0]
|
||||
# print( "Current ID: " + str(currentId) )
|
||||
self.my_id = self.return_number_from_string(self.current_id)
|
||||
match = re.search(
|
||||
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
|
||||
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)",
|
||||
twolines,
|
||||
)
|
||||
if match:
|
||||
if match.group(1):
|
||||
self.last_lines[0] = line
|
||||
return
|
||||
description = " "
|
||||
if match.group(6):
|
||||
description = self.clean_up_description(match.group(6))
|
||||
string_to_add = match.group(2)
|
||||
full_id = (self.my_id * 100) + self.return_number_from_string(
|
||||
match.group(3)
|
||||
)
|
||||
severity = match.group(4)
|
||||
if full_id in self.mib_table:
|
||||
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
|
||||
# " was already in " + self.mib_table[full_id][3])
|
||||
pass
|
||||
if self.obsw_root_path is not None:
|
||||
file_name = os.path.relpath(file_name, self.obsw_root_path)
|
||||
# Replace backslashes with regular slashes
|
||||
file_name.replace("\\", "/")
|
||||
self.mib_table.update(
|
||||
{full_id: (string_to_add, severity, description, file_name)}
|
||||
)
|
||||
self.count = self.count + 1
|
||||
self.last_lines[0] = line
|
||||
|
||||
def build_checked_string(self, first_part, second_part):
|
||||
my_str = first_part + self.convert(second_part)
|
||||
if len(my_str) > 16:
|
||||
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
|
||||
my_str = my_str[0:14]
|
||||
# else:
|
||||
# print( "Entry: " + myStr + " is all right.")
|
||||
return my_str
|
||||
|
||||
@staticmethod
|
||||
@@ -275,92 +239,76 @@ class EventParser(FileParser):
|
||||
return description
|
||||
|
||||
|
||||
def export_to_file(filename: str, event_list: list, file_separator: str):
|
||||
file = open(filename, "w")
|
||||
for entry in event_list:
|
||||
event_id = int(entry[0])
|
||||
event_value = entry[1]
|
||||
event_id_as_hex = f"{event_id:#06x}"
|
||||
file.write(
|
||||
str(event_id)
|
||||
+ file_separator
|
||||
+ event_id_as_hex
|
||||
+ file_separator
|
||||
+ event_value[EVENT_ENTRY_NAME_IDX]
|
||||
+ file_separator
|
||||
+ event_value[EVENT_ENTRY_SEVERITY_IDX]
|
||||
+ file_separator
|
||||
+ event_value[EVENT_ENTRY_INFO_IDX]
|
||||
+ file_separator
|
||||
+ event_value[EVENT_SOURCE_FILE_IDX]
|
||||
+ "\n"
|
||||
def export_to_csv(filename: Path, event_list: EventDictT, col_sep: str):
|
||||
with open(filename, "w") as out:
|
||||
fsep = col_sep
|
||||
out.write(
|
||||
f"Event ID (dec){col_sep} Event ID (hex){col_sep} Name{col_sep} "
|
||||
f"Severity{col_sep} Description{col_sep} File Path\n"
|
||||
)
|
||||
file.close()
|
||||
return
|
||||
for entry in event_list.items():
|
||||
event_id = int(entry[0])
|
||||
event_value = entry[1]
|
||||
event_id_as_hex = f"{event_id:#06x}"
|
||||
out.write(
|
||||
f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value.name}{fsep}"
|
||||
f"{event_value.severity}{fsep}{event_value.description}"
|
||||
f"{fsep}{event_value.file_name.as_posix()}\n"
|
||||
)
|
||||
|
||||
|
||||
def write_translation_source_file(
|
||||
event_list: list, date_string: str, filename: str = "translateEvents.cpp"
|
||||
event_list: EventDictT, date_string: str, filename: Path = "translateEvents.cpp"
|
||||
):
|
||||
outputfile = open(filename, "w")
|
||||
definitions = ""
|
||||
# Look up table to avoid duplicate events
|
||||
lut = dict()
|
||||
function = (
|
||||
"const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
|
||||
)
|
||||
for entry in event_list:
|
||||
event_id = entry[0]
|
||||
event_value = entry[1]
|
||||
if event_value[EVENT_ENTRY_NAME_IDX] not in lut:
|
||||
definitions += (
|
||||
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING "
|
||||
f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n'
|
||||
)
|
||||
function += (
|
||||
f" case ({event_id}):\n "
|
||||
f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
|
||||
)
|
||||
lut.update({event_value[EVENT_ENTRY_NAME_IDX] : event_value})
|
||||
function += ' default:\n return "UNKNOWN_EVENT";\n'
|
||||
outputfile.write(
|
||||
f"/**\n * @brief Auto-generated event translation file. "
|
||||
f"Contains {len(event_list)} translations.\n"
|
||||
f" * @details\n"
|
||||
f" * Generated on: {date_string}\n */\n"
|
||||
)
|
||||
outputfile.write('#include "translateEvents.h"\n\n')
|
||||
outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n")
|
||||
outputfile.close()
|
||||
with open(filename, "w") as out:
|
||||
definitions = ""
|
||||
# Look up table to avoid duplicate events
|
||||
lut = dict()
|
||||
function = "const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
|
||||
for entry in event_list.items():
|
||||
event_id = entry[0]
|
||||
event_value = entry[1]
|
||||
name = event_value.name
|
||||
if name not in lut:
|
||||
definitions += f"const char *{name}_STRING " f'= "{name}";\n'
|
||||
function += f" case ({event_id}):\n " f"return {name}_STRING;\n"
|
||||
lut.update({name: event_value})
|
||||
function += ' default:\n return "UNKNOWN_EVENT";\n'
|
||||
out.write(
|
||||
f"/**\n * @brief Auto-generated event translation file. "
|
||||
f"Contains {len(event_list)} translations.\n"
|
||||
f" * @details\n"
|
||||
f" * Generated on: {date_string}\n */\n"
|
||||
)
|
||||
out.write('#include "translateEvents.h"\n\n')
|
||||
out.write(definitions + "\n" + function + " }\n return 0;\n}\n")
|
||||
|
||||
|
||||
def write_translation_header_file(filename: str = "translateEvents.h"):
|
||||
file = open(filename, "w")
|
||||
file.write(
|
||||
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
|
||||
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
|
||||
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
|
||||
f"const char *translateEvents(Event event);\n\n"
|
||||
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
|
||||
)
|
||||
def write_translation_header_file(filename: Path = "translateEvents.h"):
|
||||
with open(filename, "w") as out:
|
||||
out.write(
|
||||
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
|
||||
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
|
||||
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
|
||||
f"const char *translateEvents(Event event);\n\n"
|
||||
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
|
||||
)
|
||||
|
||||
|
||||
def handle_csv_export(file_name: str, event_list: list, file_separator: str):
|
||||
def handle_csv_export(file_name: Path, event_list: EventDictT, file_separator: str):
|
||||
"""
|
||||
Generates the CSV in the same directory as the .py file and copes the CSV to another
|
||||
directory if specified.
|
||||
"""
|
||||
export_to_file(
|
||||
filename=file_name, event_list=event_list, file_separator=file_separator
|
||||
)
|
||||
export_to_csv(filename=file_name, event_list=event_list, col_sep=file_separator)
|
||||
|
||||
|
||||
def handle_cpp_export(
|
||||
event_list: list,
|
||||
event_list: EventDictT,
|
||||
date_string: str,
|
||||
file_name: str = "translateEvents.cpp",
|
||||
file_name: Path = "translateEvents.cpp",
|
||||
generate_header: bool = True,
|
||||
header_file_name: str = "translateEvents.h",
|
||||
header_file_name: Path = "translateEvents.h",
|
||||
):
|
||||
write_translation_source_file(
|
||||
event_list=event_list, date_string=date_string, filename=file_name
|
||||
|
97
fsfwgen/logging.py
Normal file
97
fsfwgen/logging.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import logging
|
||||
import sys
|
||||
from typing import Optional
|
||||
from colorlog import ColoredFormatter
|
||||
|
||||
|
||||
FSFWGEN_LOGGER_NAME = "fsfwgen"
|
||||
__CONSOLE_LOGGER_SET_UP: Optional[logging.Logger] = None
|
||||
|
||||
|
||||
def get_console_logger() -> logging.Logger:
|
||||
global __CONSOLE_LOGGER_SET_UP
|
||||
"""Get the global console logger instance. Error logs will still be saved to an error file
|
||||
"""
|
||||
logger = logging.getLogger(FSFWGEN_LOGGER_NAME)
|
||||
if not __CONSOLE_LOGGER_SET_UP:
|
||||
__CONSOLE_LOGGER_SET_UP = True
|
||||
__setup_tmtc_console_logger()
|
||||
return logger
|
||||
|
||||
|
||||
def init_console_logger(log_level: int = logging.INFO) -> logging.Logger:
|
||||
global __CONSOLE_LOGGER_SET_UP
|
||||
if not __CONSOLE_LOGGER_SET_UP:
|
||||
__CONSOLE_LOGGER_SET_UP = True
|
||||
return __setup_tmtc_console_logger(log_level=log_level)
|
||||
return get_console_logger()
|
||||
|
||||
|
||||
def __setup_tmtc_console_logger(log_level: int = logging.INFO) -> logging.Logger:
|
||||
"""Sets the LOGGER object which will be used globally. This needs to be called before
|
||||
using the logger.
|
||||
:return: Returns the instance of the global logger
|
||||
"""
|
||||
logger = logging.getLogger(FSFWGEN_LOGGER_NAME)
|
||||
# Use colorlog for now because it allows more flexibility and custom messages
|
||||
# for different levels
|
||||
set_up_colorlog_logger(logger=logger)
|
||||
logger.setLevel(level=log_level)
|
||||
# set_up_coloredlogs_logger(logger=logger)
|
||||
return logger
|
||||
|
||||
|
||||
# Custom formatter. Allows different strings for info, error and debug output
|
||||
class CustomTmtccmdFormatter(ColoredFormatter):
|
||||
def __init__(
|
||||
self, info_fmt: str, dbg_fmt: str, err_fmt: str, warn_fmt: str, datefmt=None
|
||||
):
|
||||
self.err_fmt = err_fmt
|
||||
self.info_fmt = info_fmt
|
||||
self.dbg_fmt = dbg_fmt
|
||||
self.warn_fmt = warn_fmt
|
||||
super().__init__(fmt="%(levelno)d: %(msg)s", datefmt=datefmt, style="%")
|
||||
|
||||
def format(self, record):
|
||||
|
||||
# Save the original format configured by the user
|
||||
# when the logger formatter was instantiated
|
||||
format_orig = self._style._fmt
|
||||
|
||||
# Replace the original format with one customized by logging level
|
||||
if record.levelno == logging.DEBUG:
|
||||
self._style._fmt = self.dbg_fmt
|
||||
|
||||
elif record.levelno == logging.INFO:
|
||||
self._style._fmt = self.info_fmt
|
||||
|
||||
elif record.levelno == logging.ERROR:
|
||||
self._style._fmt = self.err_fmt
|
||||
|
||||
elif record.levelno == logging.WARNING:
|
||||
self._style._fmt = self.warn_fmt
|
||||
|
||||
# Call the original formatter class to do the grunt work
|
||||
result = logging.Formatter.format(self, record)
|
||||
|
||||
# Restore the original format configured by the user
|
||||
self._style._fmt = format_orig
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def set_up_colorlog_logger(logger: logging.Logger):
|
||||
from colorlog import StreamHandler
|
||||
|
||||
dbg_fmt = "%(log_color)s%(levelname)-8s %(cyan)s [%(filename)s:%(lineno)d] %(reset)s%(message)s"
|
||||
custom_formatter = CustomTmtccmdFormatter(
|
||||
info_fmt="%(log_color)s%(levelname)-8s %(cyan)s %(reset)s%(message)s",
|
||||
dbg_fmt=dbg_fmt,
|
||||
err_fmt=dbg_fmt,
|
||||
warn_fmt=dbg_fmt,
|
||||
)
|
||||
|
||||
console_handler = StreamHandler(stream=sys.stdout)
|
||||
console_handler.setFormatter(custom_formatter)
|
||||
logger.addHandler(console_handler)
|
||||
logger.propagate = False
|
@@ -1,7 +1,9 @@
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from fsfwgen.parserbase.parser import FileParser
|
||||
from fsfwgen.core import get_console_logger
|
||||
from fsfwgen.logging import get_console_logger
|
||||
from fsfwgen.utility.sql_writer import SqlWriter
|
||||
|
||||
|
||||
@@ -9,10 +11,10 @@ LOGGER = get_console_logger()
|
||||
|
||||
|
||||
class ObjectDefinitionParser(FileParser):
|
||||
def __init__(self, file_list: list):
|
||||
def __init__(self, file_list: List[Path]):
|
||||
super().__init__(file_list)
|
||||
|
||||
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
|
||||
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
|
||||
file = open(file_name, "r", encoding="utf-8")
|
||||
for line in file.readlines():
|
||||
match = re.search(r"([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)", line)
|
||||
@@ -21,7 +23,7 @@ class ObjectDefinitionParser(FileParser):
|
||||
|
||||
def _handle_file_parsing_moving_window(
|
||||
self,
|
||||
file_name: str,
|
||||
file_name: Path,
|
||||
current_line: int,
|
||||
moving_window_size: int,
|
||||
moving_window: list,
|
||||
@@ -42,28 +44,27 @@ def export_object_file(filename, object_list, file_separator: str = ","):
|
||||
|
||||
|
||||
def write_translation_file(filename: str, list_of_entries, date_string_full: str):
|
||||
outputfile = open(filename, "w")
|
||||
LOGGER.info("ObjectParser: Writing translation file " + filename)
|
||||
definitions = ""
|
||||
function = (
|
||||
"const char *translateObject(object_id_t object) "
|
||||
"{\n switch ((object & 0xFFFFFFFF)) {\n"
|
||||
)
|
||||
for entry in list_of_entries:
|
||||
# first part of translate file
|
||||
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
|
||||
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
|
||||
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
|
||||
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
|
||||
outputfile.write(
|
||||
f"/**\n * @brief Auto-generated object translation file.\n"
|
||||
f" * @details\n"
|
||||
f" * Contains {len(list_of_entries)} translations.\n"
|
||||
f" * Generated on: {date_string_full}\n */\n"
|
||||
)
|
||||
outputfile.write('#include "translateObjects.h"\n\n')
|
||||
outputfile.write(definitions + "\n" + function + " return 0;\n}\n")
|
||||
outputfile.close()
|
||||
with open(filename, "w") as out:
|
||||
LOGGER.info("ObjectParser: Writing translation file " + filename)
|
||||
definitions = ""
|
||||
function = (
|
||||
"const char *translateObject(object_id_t object) "
|
||||
"{\n switch ((object & 0xFFFFFFFF)) {\n"
|
||||
)
|
||||
for entry in list_of_entries:
|
||||
# first part of translate file
|
||||
definitions += f'const char *{entry[1][0]}_STRING = "{entry[1][0]}";\n'
|
||||
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
|
||||
function += f" case {entry[0]}:\n return {entry[1][0]}_STRING;\n"
|
||||
function += ' default:\n return "UNKNOWN_OBJECT";\n }\n'
|
||||
out.write(
|
||||
f"/**\n * @brief Auto-generated object translation file.\n"
|
||||
f" * @details\n"
|
||||
f" * Contains {len(list_of_entries)} translations.\n"
|
||||
f" * Generated on: {date_string_full}\n */\n"
|
||||
)
|
||||
out.write('#include "translateObjects.h"\n\n')
|
||||
out.write(definitions + "\n" + function + " return 0;\n}\n")
|
||||
|
||||
|
||||
def write_translation_header_file(filename: str = "translateObjects.h"):
|
||||
|
@@ -1,11 +1,11 @@
|
||||
"""Generic File Parser class
|
||||
Used by parse header files. Implemented as class in case header parser becomes more complex
|
||||
"""
|
||||
import os
|
||||
import re
|
||||
from typing import Union
|
||||
from pathlib import Path
|
||||
from typing import Union, List
|
||||
|
||||
from fsfwgen.core import get_console_logger
|
||||
from fsfwgen.logging import get_console_logger
|
||||
from logging import DEBUG
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
@@ -17,11 +17,11 @@ class FileListParser:
|
||||
TODO: Filter functionality for each directory to filter out files or folders
|
||||
"""
|
||||
|
||||
def __init__(self, directory_list_or_name: Union[str, list]):
|
||||
def __init__(self, directory_list_or_name: Union[Path, List[Path]]):
|
||||
self.directory_list = []
|
||||
if isinstance(directory_list_or_name, str):
|
||||
if isinstance(directory_list_or_name, Path):
|
||||
self.directory_list.append(directory_list_or_name)
|
||||
elif isinstance(directory_list_or_name, list):
|
||||
elif isinstance(directory_list_or_name, List):
|
||||
self.directory_list.extend(directory_list_or_name)
|
||||
else:
|
||||
LOGGER.warning(
|
||||
@@ -34,7 +34,7 @@ class FileListParser:
|
||||
search_recursively: bool = False,
|
||||
printout_string: str = "Parsing header files: ",
|
||||
print_current_dir: bool = False,
|
||||
):
|
||||
) -> List[Path]:
|
||||
"""This function is called to get a list of header files
|
||||
:param search_recursively:
|
||||
:param printout_string:
|
||||
@@ -52,30 +52,21 @@ class FileListParser:
|
||||
|
||||
def __get_header_file_list(
|
||||
self,
|
||||
base_directory: str,
|
||||
base_directory: Path,
|
||||
seach_recursively: bool = False,
|
||||
print_current_dir: bool = False,
|
||||
):
|
||||
if base_directory[-1] != "/":
|
||||
base_directory += "/"
|
||||
local_header_files = []
|
||||
if print_current_dir:
|
||||
print("Parsing header files in: " + base_directory)
|
||||
base_list = os.listdir(base_directory)
|
||||
# g.PP.pprint(base_list)
|
||||
for entry in base_list:
|
||||
header_file_match = re.match(r"[_.]*.*\.h", entry)
|
||||
if header_file_match:
|
||||
if os.path.isfile(base_directory + entry):
|
||||
match_string = header_file_match.group(0)
|
||||
if match_string[0] == "." or match_string[0] == "_":
|
||||
pass
|
||||
else:
|
||||
local_header_files.append(base_directory + entry)
|
||||
print(f"Parsing header files in: {base_directory}")
|
||||
for entry in base_directory.iterdir():
|
||||
if (
|
||||
entry.is_file()
|
||||
and entry.suffix == ".h"
|
||||
and entry.as_posix()[0] not in [".", "_"]
|
||||
):
|
||||
local_header_files.append(entry)
|
||||
if seach_recursively:
|
||||
next_path = base_directory + entry
|
||||
if os.path.isdir(next_path):
|
||||
self.__get_header_file_list(next_path, seach_recursively)
|
||||
# print("Files found in: " + base_directory)
|
||||
# g.PP.pprint(local_header_files)
|
||||
if entry.is_dir():
|
||||
self.__get_header_file_list(entry, seach_recursively)
|
||||
self.header_files.extend(local_header_files)
|
||||
|
@@ -14,7 +14,9 @@ Child classes fill out the MIB table (self.mib_table)
|
||||
import enum
|
||||
import re
|
||||
from abc import abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from enum import Enum, auto
|
||||
|
||||
|
||||
class VerbosityLevels(enum.Enum):
|
||||
@@ -23,9 +25,9 @@ class VerbosityLevels(enum.Enum):
|
||||
DEBUG = 2
|
||||
|
||||
|
||||
class FileParserModes(enum.Enum):
|
||||
REGULAR = (enum.auto(),)
|
||||
MOVING_WINDOW = enum.auto()
|
||||
class FileParserModes(Enum):
|
||||
REGULAR = 1
|
||||
MOVING_WINDOW = 2
|
||||
|
||||
|
||||
class FileParser:
|
||||
@@ -39,7 +41,7 @@ class FileParser:
|
||||
will be passed through to the abstract function implementations.
|
||||
"""
|
||||
|
||||
def __init__(self, file_list):
|
||||
def __init__(self, file_list: List[Path]):
|
||||
if len(file_list) == 0:
|
||||
print("File list is empty !")
|
||||
self.file_list_empty = True
|
||||
@@ -111,7 +113,7 @@ class FileParser:
|
||||
return self.mib_table
|
||||
|
||||
@abstractmethod
|
||||
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
|
||||
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
|
||||
"""
|
||||
Implemented by child class. The developer should fill the info table (self.mib_table)
|
||||
in this routine
|
||||
@@ -125,7 +127,7 @@ class FileParser:
|
||||
@abstractmethod
|
||||
def _handle_file_parsing_moving_window(
|
||||
self,
|
||||
file_name: str,
|
||||
file_name: Path,
|
||||
current_line: int,
|
||||
moving_window_size: int,
|
||||
moving_window: list,
|
||||
@@ -151,7 +153,7 @@ class FileParser:
|
||||
:return:
|
||||
"""
|
||||
|
||||
def __parse_file_with_moving_window(self, file_name: str, *args, **kwargs):
|
||||
def __parse_file_with_moving_window(self, file_name: Path, *args, **kwargs):
|
||||
all_lines = self._open_file(file_name=file_name)
|
||||
moving_window_size = self.__parser_args
|
||||
if moving_window_size == 0:
|
||||
@@ -212,7 +214,7 @@ class FileParser:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _open_file(file_name: str) -> list:
|
||||
def _open_file(file_name: Path) -> list:
|
||||
"""
|
||||
Open a file, attempting common encodings utf-8 and cp1252
|
||||
:param file_name:
|
||||
@@ -222,7 +224,7 @@ class FileParser:
|
||||
file = open(file_name, "r", encoding="utf-8")
|
||||
all_lines = file.readlines()
|
||||
except UnicodeDecodeError:
|
||||
print("ReturnValueParser: Decoding error with file " + file_name)
|
||||
print(f"Parser: Decoding error with file {file_name}")
|
||||
file = open(file_name, "r", encoding="cp1252")
|
||||
all_lines = file.readlines()
|
||||
return all_lines
|
||||
@@ -261,9 +263,7 @@ class FileParser:
|
||||
descrip_match = re.search(
|
||||
r"\[EXPORT][\s]*:[\s]*\[COMMENT]", moving_window[current_idx]
|
||||
)
|
||||
if descrip_match:
|
||||
break
|
||||
if current_idx <= 0:
|
||||
if descrip_match or current_idx <= 0:
|
||||
break
|
||||
current_idx -= 1
|
||||
if descrip_match:
|
||||
|
@@ -1,11 +1,12 @@
|
||||
import os.path
|
||||
import re
|
||||
import sys
|
||||
from typing import List, Tuple, Optional
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Optional, Dict, Union
|
||||
|
||||
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels
|
||||
from fsfwgen.utility.printer import PrettyPrinter
|
||||
from fsfwgen.core import get_console_logger
|
||||
from fsfwgen.logging import get_console_logger
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
@@ -20,14 +21,34 @@ DEFAULT_MOVING_WINDOWS_SIZE = 7
|
||||
INVALID_IF_ID = -1
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileStartHelper:
|
||||
start_name: str
|
||||
start_name_or_value: str
|
||||
count: int
|
||||
cumulative_start_index: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileEndHelper:
|
||||
end_name: str
|
||||
cumulative_end_value: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileConnHelper:
|
||||
file_name: str
|
||||
sh: Optional[FileStartHelper]
|
||||
eh: Optional[FileEndHelper]
|
||||
|
||||
|
||||
class InterfaceParser(FileParser):
|
||||
def __init__(self, file_list: list, print_table: bool = False):
|
||||
super().__init__(file_list)
|
||||
self.print_table = print_table
|
||||
self.file_table_list = []
|
||||
self.file_name_table = []
|
||||
self.start_name_list = []
|
||||
self.end_name_list = []
|
||||
self.file_conn_helpers: Optional[List[FileConnHelper]] = None
|
||||
self._debug_mode = False
|
||||
|
||||
def enable_debug_mode(self, enable: bool):
|
||||
@@ -60,23 +81,28 @@ class InterfaceParser(FileParser):
|
||||
start_matched = False
|
||||
end_matched = False
|
||||
start_name = ""
|
||||
target_end_name = ""
|
||||
first_entry_name_or_index = ""
|
||||
file_conn_entry = FileConnHelper(file_name, None, None)
|
||||
for line in all_lines:
|
||||
if not start_matched:
|
||||
match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line)
|
||||
if match:
|
||||
# current_file_table.update({count: [match.group(1), match.group(2)]})
|
||||
start_name = match.group(1)
|
||||
target_end_name = match.group(2)
|
||||
first_entry_name_or_index = match.group(2)
|
||||
start_matched = True
|
||||
else:
|
||||
match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line)
|
||||
if match:
|
||||
count += 1
|
||||
# It is expected that the last entry is explicitely marked like this.
|
||||
# TODO: Could also simply remember last entry and then designate that as end
|
||||
# entry as soon as "}" is found. Requires moving window mode though
|
||||
if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)):
|
||||
last_entry_name = match.group(1)
|
||||
end_matched = True
|
||||
self.end_name_list.append([last_entry_name, None])
|
||||
file_conn_entry.eh = FileEndHelper(last_entry_name, None)
|
||||
break
|
||||
else:
|
||||
short_name = match.group(2)
|
||||
if short_name == "":
|
||||
@@ -94,48 +120,68 @@ class InterfaceParser(FileParser):
|
||||
"Make sure to use [EXPORT] : [END]"
|
||||
)
|
||||
sys.exit(1)
|
||||
self.start_name_list.append([start_name, target_end_name, None, count])
|
||||
file_conn_entry.sh = FileStartHelper(
|
||||
start_name, first_entry_name_or_index, count, None
|
||||
)
|
||||
if self.file_conn_helpers is None:
|
||||
self.file_conn_helpers = []
|
||||
self.file_conn_helpers.append(file_conn_entry)
|
||||
self.file_name_table.append(file_name)
|
||||
self.file_table_list.append(current_file_table)
|
||||
|
||||
def _post_parsing_operation(self):
|
||||
self.start_name_list, self.end_name_list = self.__assign_start_end_indexes(
|
||||
self.start_name_list, self.end_name_list
|
||||
)
|
||||
self.__assign_start_end_indexes()
|
||||
self._print_start_end_info()
|
||||
for idx, file_table in enumerate(self.file_table_list):
|
||||
self.__build_mod_interface_table(self.start_name_list[idx][2], file_table)
|
||||
self.__build_mod_interface_table(
|
||||
self.file_conn_helpers[idx].sh.cumulative_start_index, file_table
|
||||
)
|
||||
if self.print_table:
|
||||
PrettyPrinter.pprint(self.mib_table)
|
||||
|
||||
@staticmethod
|
||||
def __assign_start_end_indexes(
|
||||
start_name_list_list, end_name_list_list
|
||||
) -> Tuple[List, List]:
|
||||
start_list_list_completed = start_name_list_list
|
||||
end_list_list_completed = end_name_list_list
|
||||
def _print_start_end_info(self):
|
||||
for conn_helper in self.file_conn_helpers:
|
||||
print(
|
||||
f"Detected {conn_helper.sh.count} entries in {conn_helper.file_name}, "
|
||||
f"end index {conn_helper.eh.cumulative_end_value}"
|
||||
)
|
||||
|
||||
def __assign_start_end_indexes(self):
|
||||
conn_helpers_old = self.file_conn_helpers.copy()
|
||||
all_indexes_filled = False
|
||||
max_outer_iterations = 15
|
||||
current_iteration = 0
|
||||
while not all_indexes_filled:
|
||||
for idx, start_name_list in enumerate(start_list_list_completed):
|
||||
if start_name_list[1].isdigit():
|
||||
start_list_list_completed[idx][2] = int(start_name_list[1])
|
||||
end_list_list_completed[idx][1] = (
|
||||
start_list_list_completed[idx][2]
|
||||
+ start_list_list_completed[idx][3]
|
||||
for idx, conn_helper in enumerate(conn_helpers_old):
|
||||
sh = conn_helper.sh
|
||||
# In the very first file, the first index might/will be a number
|
||||
if sh.start_name_or_value.isdigit():
|
||||
sh.cumulative_start_index = int(sh.start_name_or_value)
|
||||
conn_helpers_old[idx].eh.cumulative_end_value = (
|
||||
sh.cumulative_start_index + sh.count
|
||||
)
|
||||
target_end_name = start_name_list[1]
|
||||
for end_name_list in end_list_list_completed:
|
||||
end_name = end_name_list[0]
|
||||
end_value = end_name_list[1]
|
||||
if end_name == target_end_name and end_value is not None:
|
||||
start_list_list_completed[idx][2] = end_value
|
||||
end_list_list_completed[idx][1] = (
|
||||
end_value + start_list_list_completed[idx][3]
|
||||
# Now, we try to connect the start and end of the files using the start and end
|
||||
# names respectively
|
||||
end_name_to_search = conn_helper.sh.start_name_or_value
|
||||
for end_name_helper in conn_helpers_old:
|
||||
eh = end_name_helper.eh
|
||||
if (
|
||||
eh.end_name == end_name_to_search
|
||||
and eh.cumulative_end_value is not None
|
||||
):
|
||||
self.file_conn_helpers[
|
||||
idx
|
||||
].sh.cumulative_start_index = eh.cumulative_end_value
|
||||
self.file_conn_helpers[idx].eh.cumulative_end_value = (
|
||||
eh.cumulative_end_value
|
||||
+ self.file_conn_helpers[idx].sh.count
|
||||
)
|
||||
all_indexes_filled = True
|
||||
for idx, start_name_list in enumerate(start_list_list_completed):
|
||||
if start_name_list[2] is None or end_name_list_list[idx][1] is None:
|
||||
for idx, conn_helper in enumerate(conn_helpers_old):
|
||||
if (
|
||||
conn_helper.sh.cumulative_start_index is None
|
||||
or conn_helper.eh.cumulative_end_value is None
|
||||
):
|
||||
all_indexes_filled = False
|
||||
current_iteration += 1
|
||||
if current_iteration >= max_outer_iterations:
|
||||
@@ -144,7 +190,6 @@ class InterfaceParser(FileParser):
|
||||
"given number of maximum outer iterations!"
|
||||
)
|
||||
sys.exit(1)
|
||||
return start_list_list_completed, end_list_list_completed
|
||||
|
||||
def __build_mod_interface_table(self, count_start: int, interface_dict: dict):
|
||||
dict_to_build = dict()
|
||||
@@ -160,6 +205,32 @@ class InterfaceParser(FileParser):
|
||||
self.mib_table.update(dict_to_build)
|
||||
|
||||
|
||||
class RetvalEntry:
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
unique_id: int,
|
||||
file_name: Path,
|
||||
subsystem_name: str,
|
||||
):
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.unique_id = unique_id
|
||||
self.file_name = file_name
|
||||
self.subsystem_name = subsystem_name
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"RetvalEntry(name={self.name!r}, description={self.description!r}, "
|
||||
f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, "
|
||||
f"subsystem_name={self.subsystem_name!r})"
|
||||
)
|
||||
|
||||
|
||||
RetvalDictT = Dict[int, RetvalEntry]
|
||||
|
||||
|
||||
class ReturnValueParser(FileParser):
|
||||
"""
|
||||
Generic return value parser.
|
||||
@@ -173,32 +244,32 @@ class ReturnValueParser(FileParser):
|
||||
self.count = 0
|
||||
# Stores last three lines
|
||||
self.last_lines = ["", "", ""]
|
||||
self.obsw_root_path: Optional[str] = None
|
||||
self.obsw_root_path: Optional[Path] = None
|
||||
self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""}
|
||||
self.return_value_dict.update(
|
||||
{
|
||||
0: (
|
||||
"OK",
|
||||
"System-wide code for ok.",
|
||||
"RETURN_OK",
|
||||
"HasReturnvaluesIF.h",
|
||||
"HasReturnvaluesIF",
|
||||
0: RetvalEntry(
|
||||
name="OK",
|
||||
description="System-wide code for ok.",
|
||||
unique_id=0,
|
||||
file_name=Path("fsfw/returnvalues/returnvalue.h"),
|
||||
subsystem_name="HasReturnvaluesIF",
|
||||
)
|
||||
}
|
||||
)
|
||||
self.return_value_dict.update(
|
||||
{
|
||||
1: (
|
||||
"Failed",
|
||||
"Unspecified system-wide code for failed.",
|
||||
"RETURN_FAILED",
|
||||
"HasReturnvaluesIF.h",
|
||||
"HasReturnvaluesIF",
|
||||
1: RetvalEntry(
|
||||
name="Failed",
|
||||
description="Unspecified system-wide code for failed.",
|
||||
unique_id=1,
|
||||
file_name=Path("fsfw/returnvalues/returnvalue.h"),
|
||||
subsystem_name="HasReturnvaluesIF",
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
|
||||
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
|
||||
"""Former way to parse returnvalues. Not recommended anymore.
|
||||
:param file_name:
|
||||
:param args:
|
||||
@@ -215,7 +286,7 @@ class ReturnValueParser(FileParser):
|
||||
|
||||
def _handle_file_parsing_moving_window(
|
||||
self,
|
||||
file_name: str,
|
||||
file_name: Path,
|
||||
current_line: int,
|
||||
moving_window_size: int,
|
||||
moving_window: list,
|
||||
@@ -249,7 +320,7 @@ class ReturnValueParser(FileParser):
|
||||
number_match = INVALID_IF_ID
|
||||
# Try to match for a string using the new API first. Example:
|
||||
# static const ReturnValue_t PACKET_TOO_LONG =
|
||||
# HasReturnvaluesIF::makeReturnCode(CLASS_ID, 0);
|
||||
# returnvalue::makeCode(CLASS_ID, 0);
|
||||
returnvalue_match = re.search(
|
||||
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*"
|
||||
r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
|
||||
@@ -263,9 +334,11 @@ class ReturnValueParser(FileParser):
|
||||
full_returnvalue_string,
|
||||
)
|
||||
if returnvalue_match:
|
||||
number_match = returnvalue_match.group(2)
|
||||
number_match = get_number_from_dec_or_hex_str(
|
||||
returnvalue_match.group(2)
|
||||
)
|
||||
else:
|
||||
number_match = returnvalue_match.group(3)
|
||||
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(3))
|
||||
if returnvalue_match:
|
||||
description = self.__search_for_descrip_string(moving_window=moving_window)
|
||||
if number_match == INVALID_IF_ID:
|
||||
@@ -315,12 +388,12 @@ class ReturnValueParser(FileParser):
|
||||
name_match=returnvalue_match.group(1),
|
||||
file_name=file_name,
|
||||
description="",
|
||||
number_match=returnvalue_match.group(2),
|
||||
number_match=get_number_from_dec_or_hex_str(returnvalue_match.group(2)),
|
||||
)
|
||||
self.last_lines[1] = self.last_lines[0]
|
||||
self.last_lines[0] = newline
|
||||
|
||||
def __handle_interfaceid_match(self, interface_id_match, file_name: str) -> bool:
|
||||
def __handle_interfaceid_match(self, interface_id_match, file_name: Path) -> bool:
|
||||
"""Handle a match of an interface ID definition in the code.
|
||||
Returns whether the interface ID was found successfully in the IF ID header files
|
||||
"""
|
||||
@@ -338,7 +411,7 @@ class ReturnValueParser(FileParser):
|
||||
return False
|
||||
self.current_interface_id_entries["Name"] = self.interfaces[
|
||||
interface_id_match.group(1)
|
||||
][1]
|
||||
][1].lstrip()
|
||||
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
|
||||
if self.get_verbosity() == VerbosityLevels.DEBUG:
|
||||
current_id = self.current_interface_id_entries["ID"]
|
||||
@@ -346,7 +419,7 @@ class ReturnValueParser(FileParser):
|
||||
return True
|
||||
|
||||
def __handle_returnvalue_match(
|
||||
self, name_match: str, number_match: str, file_name: str, description: str
|
||||
self, name_match: str, number_match: int, file_name: Path, description: str
|
||||
):
|
||||
string_to_add = self.build_checked_string(
|
||||
self.current_interface_id_entries["Name"],
|
||||
@@ -354,23 +427,21 @@ class ReturnValueParser(FileParser):
|
||||
MAX_STRING_LEN,
|
||||
PRINT_TRUNCATED_ENTRIES,
|
||||
)
|
||||
full_id = (
|
||||
self.current_interface_id_entries["ID"] << 8
|
||||
) + return_number_from_string(number_match)
|
||||
full_id = (self.current_interface_id_entries["ID"] << 8) | number_match
|
||||
if full_id in self.return_value_dict:
|
||||
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
|
||||
# ' was already in ' + self.return_value_dict[full_id][3])
|
||||
pass
|
||||
if self.obsw_root_path is not None:
|
||||
file_name = os.path.relpath(file_name, self.obsw_root_path)
|
||||
dict_tuple = (
|
||||
string_to_add,
|
||||
description,
|
||||
number_match,
|
||||
file_name,
|
||||
self.current_interface_id_entries["FullName"],
|
||||
file_name = file_name.relative_to(self.obsw_root_path)
|
||||
mib_entry = RetvalEntry(
|
||||
name=string_to_add,
|
||||
description=description,
|
||||
unique_id=number_match,
|
||||
file_name=file_name,
|
||||
subsystem_name=self.current_interface_id_entries["FullName"],
|
||||
)
|
||||
self.return_value_dict.update({full_id: dict_tuple})
|
||||
self.return_value_dict.update({full_id: mib_entry})
|
||||
self.count = self.count + 1
|
||||
|
||||
def _post_parsing_operation(self):
|
||||
@@ -379,24 +450,23 @@ class ReturnValueParser(FileParser):
|
||||
self.mib_table = self.return_value_dict
|
||||
|
||||
@staticmethod
|
||||
def export_to_file(filename: str, list_of_entries: dict, file_separator: str):
|
||||
file = open(filename, "w")
|
||||
for entry in list_of_entries.items():
|
||||
file.write(
|
||||
hex(entry[0])
|
||||
+ file_separator
|
||||
+ entry[1][0]
|
||||
+ file_separator
|
||||
+ entry[1][1]
|
||||
+ file_separator
|
||||
+ entry[1][2]
|
||||
+ file_separator
|
||||
+ entry[1][3]
|
||||
+ file_separator
|
||||
+ entry[1][4]
|
||||
+ "\n"
|
||||
def export_to_csv(filename: Path, list_of_entries: RetvalDictT, column_sep: str):
|
||||
with open(filename, "w") as out:
|
||||
out.write(
|
||||
f"Full ID (hex){column_sep} Name{column_sep} Description{column_sep} "
|
||||
f"Unique ID{column_sep} Subsytem Name{column_sep} File Path\n"
|
||||
)
|
||||
file.close()
|
||||
for entry in list_of_entries.items():
|
||||
if column_sep == ";":
|
||||
entry[1].description = entry[1].description.replace(";", ",")
|
||||
elif column_sep == ",":
|
||||
# Quote the description
|
||||
entry[1].description = f'"{entry[1].description}"'
|
||||
out.write(
|
||||
f"{entry[0]:#06x}{column_sep}{entry[1].name}{column_sep}{entry[1].description}"
|
||||
f"{column_sep}{entry[1].unique_id}{column_sep}{entry[1].subsystem_name}"
|
||||
f"{column_sep}{entry[1].file_name.as_posix()}\n"
|
||||
)
|
||||
|
||||
def build_checked_string(
|
||||
self,
|
||||
@@ -434,7 +504,7 @@ class ReturnValueParser(FileParser):
|
||||
return description
|
||||
|
||||
|
||||
def return_number_from_string(a_string):
|
||||
def get_number_from_dec_or_hex_str(a_string):
|
||||
if a_string.startswith("0x"):
|
||||
return int(a_string, 16)
|
||||
if a_string.isdigit():
|
||||
|
@@ -1,3 +1,5 @@
|
||||
from pathlib import Path
|
||||
|
||||
from fsfwgen.utility.file_management import copy_file, move_file
|
||||
|
||||
|
||||
@@ -5,7 +7,7 @@ from fsfwgen.utility.file_management import copy_file, move_file
|
||||
class CsvWriter:
|
||||
def __init__(
|
||||
self,
|
||||
filename: str,
|
||||
filename: Path,
|
||||
table_to_print=None,
|
||||
header_array=None,
|
||||
file_separator: str = ",",
|
||||
@@ -40,11 +42,11 @@ class CsvWriter:
|
||||
file.write(str(entry[columnIndex]) + "\n")
|
||||
file.close()
|
||||
|
||||
def copy_csv(self, copy_destination: str = "."):
|
||||
def copy_csv(self, copy_destination: Path = "."):
|
||||
copy_file(self.filename, copy_destination)
|
||||
print("CSV file was copied to " + copy_destination)
|
||||
print(f"CSV file was copied to {copy_destination}")
|
||||
|
||||
def move_csv(self, move_destination: str):
|
||||
def move_csv(self, move_destination: Path):
|
||||
move_file(self.filename, move_destination)
|
||||
if move_destination == ".." or move_destination == "../":
|
||||
print("CSV Writer: CSV file was moved to parser root directory")
|
||||
|
@@ -1,12 +1,16 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import shutil
|
||||
import os
|
||||
from fsfwgen.core import get_console_logger
|
||||
from pathlib import Path
|
||||
|
||||
from fsfwgen.logging import get_console_logger
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
|
||||
def copy_file(filename: str, destination: str = "", delete_existing_file: bool = False):
|
||||
def copy_file(
|
||||
filename: Path, destination: Path = "", delete_existing_file: bool = False
|
||||
):
|
||||
if not os.path.exists(filename):
|
||||
LOGGER.warning(f"File {filename} does not exist")
|
||||
return
|
||||
@@ -24,7 +28,7 @@ def copy_file(filename: str, destination: str = "", delete_existing_file: bool =
|
||||
LOGGER.exception("Source and destination are the same!")
|
||||
|
||||
|
||||
def move_file(file_name: str, destination: str = ""):
|
||||
def move_file(file_name: Path, destination: Path = ""):
|
||||
if not os.path.exists(file_name):
|
||||
print(f"move_file: File {file_name} does not exist")
|
||||
return
|
||||
|
@@ -1,6 +1,6 @@
|
||||
import sqlite3
|
||||
|
||||
from fsfwgen.core import get_console_logger
|
||||
from fsfwgen.logging import get_console_logger
|
||||
|
||||
LOGGER = get_console_logger()
|
||||
|
||||
|
Reference in New Issue
Block a user