fsfwgen/fsfwgen/returnvalues/returnvalues_parser.py

514 lines
19 KiB
Python
Raw Normal View History

import logging
2021-06-08 12:37:10 +02:00
import re
import sys
2022-08-09 10:17:31 +02:00
from dataclasses import dataclass
2022-06-20 18:02:46 +02:00
from pathlib import Path
from typing import List, Optional, Dict
2021-06-08 12:37:10 +02:00
2021-08-02 12:49:10 +02:00
from fsfwgen.parserbase.parser import FileParser, VerbosityLevels
2021-06-08 17:10:24 +02:00
from fsfwgen.utility.printer import PrettyPrinter
2021-08-02 12:49:10 +02:00
_LOGGER = logging.getLogger(__name__)
2021-06-08 12:37:10 +02:00
# Intermediate solution
MAX_STRING_LEN = 80
PRINT_TRUNCATED_ENTRIES = False
DEBUG_FOR_FILE_NAME = False
CLASS_ID_NAMESPACE = "CLASS_ID"
DEFAULT_MOVING_WINDOWS_SIZE = 7
2021-08-02 12:49:10 +02:00
INVALID_IF_ID = -1
2021-06-08 12:37:10 +02:00
2022-08-09 10:17:31 +02:00
@dataclass
class FileStartHelper:
start_name: str
start_name_or_value: str
count: int
cumulative_start_index: Optional[int] = None
@dataclass
class FileEndHelper:
end_name: str
cumulative_end_value: Optional[int] = None
@dataclass
class FileConnHelper:
file_name: str
sh: Optional[FileStartHelper]
eh: Optional[FileEndHelper]
2021-06-08 12:37:10 +02:00
class InterfaceParser(FileParser):
def __init__(self, file_list: list, print_table: bool = False):
super().__init__(file_list)
self.print_table = print_table
self.file_table_list = []
self.file_name_table = []
2022-08-09 10:17:31 +02:00
self.file_conn_helpers: Optional[List[FileConnHelper]] = None
2021-08-02 12:49:10 +02:00
self._debug_mode = False
def enable_debug_mode(self, enable: bool):
self._debug_mode = enable
2021-06-08 12:37:10 +02:00
2021-06-21 12:52:01 +02:00
def _handle_file_parsing_moving_window(
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
2021-06-21 12:52:01 +02:00
):
2021-06-08 12:37:10 +02:00
pass
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
self.file_name_table.append(file_name)
try:
file = open(file_name, "r", encoding="utf-8")
2021-06-08 12:37:10 +02:00
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, "r", encoding="cp1252")
2021-06-08 12:37:10 +02:00
all_lines = file.readlines()
self.__handle_regular_class_id_parsing(file_name=file_name, all_lines=all_lines)
def __handle_regular_class_id_parsing(self, file_name: str, all_lines: List[str]):
count = 0
current_file_table = dict()
start_matched = False
end_matched = False
start_name = ""
2022-08-09 10:17:31 +02:00
first_entry_name_or_index = ""
file_conn_entry = FileConnHelper(file_name, None, None)
2021-06-08 12:37:10 +02:00
for line in all_lines:
if not start_matched:
match = re.search(r"[\s]*([\w]*) = [\s]*([\w]*)", line)
2021-06-08 12:37:10 +02:00
if match:
# current_file_table.update({count: [match.group(1), match.group(2)]})
start_name = match.group(1)
2022-08-09 10:17:31 +02:00
first_entry_name_or_index = match.group(2)
2021-06-08 12:37:10 +02:00
start_matched = True
else:
match = re.search(r"[\s]*([\w]*),?(?:[\s]*//)?([^\n]*)?", line)
2021-06-08 12:37:10 +02:00
if match:
count += 1
2022-08-09 10:17:31 +02:00
# It is expected that the last entry is explicitely marked like this.
# TODO: Could also simply remember last entry and then designate that as end
# entry as soon as "}" is found. Requires moving window mode though
2021-06-21 12:52:01 +02:00
if re.search(r"\[EXPORT][\s]*:[\s]*\[END]", match.group(2)):
2021-06-08 12:37:10 +02:00
last_entry_name = match.group(1)
end_matched = True
2022-08-09 10:17:31 +02:00
file_conn_entry.eh = FileEndHelper(last_entry_name, None)
break
2021-06-08 12:37:10 +02:00
else:
short_name = match.group(2)
if short_name == "":
short_name = match.group(1)[0:3]
current_file_table.update({count: [match.group(1), short_name]})
if not start_matched:
print("No start match detected when parsing interface files..")
print(
f"Current file: {file_name} | Make sure to include a start definition"
)
2021-06-08 12:37:10 +02:00
sys.exit(1)
if not end_matched:
2021-06-21 12:52:01 +02:00
print(
"No end match detected when parsing interface files. "
"Make sure to use [EXPORT] : [END]"
)
2021-06-08 12:37:10 +02:00
sys.exit(1)
2022-08-09 10:17:31 +02:00
file_conn_entry.sh = FileStartHelper(
start_name, first_entry_name_or_index, count, None
)
if self.file_conn_helpers is None:
self.file_conn_helpers = []
self.file_conn_helpers.append(file_conn_entry)
2021-06-08 12:37:10 +02:00
self.file_name_table.append(file_name)
self.file_table_list.append(current_file_table)
def _post_parsing_operation(self):
2022-08-09 10:17:31 +02:00
self.__assign_start_end_indexes()
self._print_start_end_info()
2021-06-08 12:37:10 +02:00
for idx, file_table in enumerate(self.file_table_list):
2022-08-09 10:17:31 +02:00
self.__build_mod_interface_table(
2022-08-09 10:37:50 +02:00
self.file_conn_helpers[idx].sh.cumulative_start_index, file_table
2022-08-09 10:17:31 +02:00
)
2021-06-08 12:37:10 +02:00
if self.print_table:
PrettyPrinter.pprint(self.mib_table)
2022-08-09 10:17:31 +02:00
def _print_start_end_info(self):
for conn_helper in self.file_conn_helpers:
print(
f"Detected {conn_helper.sh.count} entries in {conn_helper.file_name}, "
f"end index {conn_helper.eh.cumulative_end_value}"
)
def __assign_start_end_indexes(self):
conn_helpers_old = self.file_conn_helpers.copy()
2021-06-08 12:37:10 +02:00
all_indexes_filled = False
max_outer_iterations = 15
current_iteration = 0
while not all_indexes_filled:
2022-08-09 10:17:31 +02:00
for idx, conn_helper in enumerate(conn_helpers_old):
sh = conn_helper.sh
# In the very first file, the first index might/will be a number
if sh.start_name_or_value.isdigit():
sh.cumulative_start_index = int(sh.start_name_or_value)
conn_helpers_old[idx].eh.cumulative_end_value = (
sh.cumulative_start_index + sh.count
)
2022-08-09 10:17:31 +02:00
# Now, we try to connect the start and end of the files using the start and end
# names respectively
end_name_to_search = conn_helper.sh.start_name_or_value
for end_name_helper in conn_helpers_old:
eh = end_name_helper.eh
if (
eh.end_name == end_name_to_search
and eh.cumulative_end_value is not None
):
self.file_conn_helpers[
idx
].sh.cumulative_start_index = eh.cumulative_end_value
self.file_conn_helpers[idx].eh.cumulative_end_value = (
eh.cumulative_end_value
+ self.file_conn_helpers[idx].sh.count
)
2021-06-08 12:37:10 +02:00
all_indexes_filled = True
2022-08-09 10:17:31 +02:00
for idx, conn_helper in enumerate(conn_helpers_old):
if (
conn_helper.sh.cumulative_start_index is None
or conn_helper.eh.cumulative_end_value is None
):
2021-06-08 12:37:10 +02:00
all_indexes_filled = False
current_iteration += 1
if current_iteration >= max_outer_iterations:
2021-06-21 12:52:01 +02:00
print(
"Could not fill out start and end index list in "
"given number of maximum outer iterations!"
)
2021-06-08 12:37:10 +02:00
sys.exit(1)
def __build_mod_interface_table(self, count_start: int, interface_dict: dict):
dict_to_build = dict()
for local_count, interface_name_and_shortname in interface_dict.items():
dict_to_build.update(
{
interface_name_and_shortname[0]: [
local_count + count_start,
interface_name_and_shortname[1],
]
}
2021-06-08 12:37:10 +02:00
)
self.mib_table.update(dict_to_build)
2022-06-20 18:02:46 +02:00
class RetvalEntry:
def __init__(
self,
name: str,
description: str,
unique_id: int,
file_name: Path,
subsystem_name: str,
):
self.name = name
self.description = description
self.unique_id = unique_id
self.file_name = file_name
self.subsystem_name = subsystem_name
def __repr__(self):
return (
f"RetvalEntry(name={self.name!r}, description={self.description!r}, "
f"unique_id={self.unique_id!r}, file_name={self.file_name!r}, "
f"subsystem_name={self.subsystem_name!r})"
2022-06-20 18:02:46 +02:00
)
RetvalDictT = Dict[int, RetvalEntry]
2021-06-08 12:37:10 +02:00
class ReturnValueParser(FileParser):
"""
Generic return value parser.
"""
2021-06-08 12:37:10 +02:00
def __init__(self, interfaces, file_list, print_tables):
super().__init__(file_list)
self.print_tables = print_tables
self.interfaces = interfaces
self.return_value_dict = dict()
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
2022-06-20 18:02:46 +02:00
self.obsw_root_path: Optional[Path] = None
self.current_interface_id_entries = {"Name": "", "ID": 0, "FullName": ""}
self.return_value_dict.update(
{
2022-06-20 18:02:46 +02:00
0: RetvalEntry(
name="OK",
description="System-wide code for ok.",
unique_id=0,
2022-08-24 17:26:49 +02:00
file_name=Path("fsfw/returnvalues/returnvalue.h"),
2022-06-20 18:02:46 +02:00
subsystem_name="HasReturnvaluesIF",
)
}
)
self.return_value_dict.update(
{
2022-06-20 18:02:46 +02:00
1: RetvalEntry(
name="Failed",
description="Unspecified system-wide code for failed.",
unique_id=1,
2022-08-24 17:26:49 +02:00
file_name=Path("fsfw/returnvalues/returnvalue.h"),
2022-06-20 18:02:46 +02:00
subsystem_name="HasReturnvaluesIF",
)
}
)
2021-06-08 12:37:10 +02:00
2022-06-20 18:02:46 +02:00
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
2021-06-21 12:47:03 +02:00
"""Former way to parse returnvalues. Not recommended anymore.
2021-06-08 12:37:10 +02:00
:param file_name:
:param args:
:param kwargs:
:return:
"""
if len(args) > 0:
print_truncated_entries = args[0]
else:
print_truncated_entries = False
all_lines = self._open_file(file_name=file_name)
for line in all_lines:
self.__handle_line_reading(line, file_name, print_truncated_entries)
2021-06-21 12:47:03 +02:00
def _handle_file_parsing_moving_window(
self,
2022-06-20 18:02:46 +02:00
file_name: Path,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
2021-06-21 12:47:03 +02:00
):
"""Parse for returnvalues using a moving window"""
2021-06-08 12:37:10 +02:00
interface_id_match = re.search(
rf"{CLASS_ID_NAMESPACE}::([a-zA-Z_0-9]*)",
moving_window[self._moving_window_center_idx],
2021-06-08 12:37:10 +02:00
)
2021-06-08 12:37:10 +02:00
if interface_id_match:
2021-06-21 12:47:03 +02:00
self.__handle_interfaceid_match(
interface_id_match=interface_id_match, file_name=file_name
)
2021-06-08 12:37:10 +02:00
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)?[\s]*ReturnValue_t[\s]*([\w]*)[\s]*=[\s]*((?!;).*$)",
moving_window[self._moving_window_center_idx],
re.DOTALL,
2021-06-08 12:37:10 +02:00
)
full_returnvalue_string = ""
if returnvalue_match:
if ";" in returnvalue_match.group(0):
full_returnvalue_string = returnvalue_match.group(0)
else:
full_returnvalue_string = self.__build_multi_line_returnvalue_string(
2021-06-21 12:47:03 +02:00
moving_window=moving_window,
first_line=moving_window[self._moving_window_center_idx],
2021-06-08 12:37:10 +02:00
)
2021-08-02 12:49:10 +02:00
number_match = INVALID_IF_ID
# Try to match for a string using the new API first. Example:
# static const ReturnValue_t PACKET_TOO_LONG =
2022-08-24 17:26:49 +02:00
# returnvalue::makeCode(CLASS_ID, 0);
2021-06-08 12:37:10 +02:00
returnvalue_match = re.search(
2021-08-02 12:49:10 +02:00
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([\w]*)[\s]*"
2021-06-21 12:47:03 +02:00
r"=[\s]*.*::[\w]*\(([\w]*),[\s]*([\d]*)\)",
full_returnvalue_string,
2021-06-08 12:37:10 +02:00
)
if not returnvalue_match:
2021-08-02 12:49:10 +02:00
# Try to match for old API using MAE_RETURN_CODE macro
2021-06-08 12:37:10 +02:00
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_RETURN_CODE[\s]*\([\s]*([\w]*)[\s]*\)",
full_returnvalue_string,
2021-06-08 12:37:10 +02:00
)
2021-08-02 12:49:10 +02:00
if returnvalue_match:
2022-06-20 18:02:46 +02:00
number_match = get_number_from_dec_or_hex_str(
returnvalue_match.group(2)
)
2021-08-02 12:49:10 +02:00
else:
2022-06-20 18:02:46 +02:00
number_match = get_number_from_dec_or_hex_str(returnvalue_match.group(3))
2021-06-08 12:37:10 +02:00
if returnvalue_match:
description = self.__search_for_descrip_string(moving_window=moving_window)
2021-08-02 12:49:10 +02:00
if number_match == INVALID_IF_ID:
_LOGGER.warning(f"Invalid number match detected for file {file_name}")
_LOGGER.warning(f"Match groups:")
2021-08-02 12:49:10 +02:00
for group in returnvalue_match.groups():
_LOGGER.info(group)
2021-06-08 12:37:10 +02:00
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1),
file_name=file_name,
number_match=number_match,
description=description,
2021-06-08 12:37:10 +02:00
)
def __build_multi_line_returnvalue_string(
self, first_line: str, moving_window: List[str]
2021-06-08 12:37:10 +02:00
) -> str:
2021-06-21 12:47:03 +02:00
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
)
2021-06-08 12:37:10 +02:00
def __search_for_descrip_string(self, moving_window: List[str]) -> str:
2021-06-21 12:47:03 +02:00
return self._search_for_descrip_string_generic(
moving_window=moving_window,
break_pattern=r"^[\s]*static const(?:expr)? ReturnValue_t",
2021-06-08 12:37:10 +02:00
)
2021-06-08 12:37:10 +02:00
def __handle_line_reading(self, line, file_name, print_truncated_entries: bool):
newline = line
if self.last_lines[0] != "\n":
two_lines = self.last_lines[0] + " " + newline.strip()
2021-06-08 12:37:10 +02:00
else:
two_lines = ""
interface_id_match = re.search(
r"INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)", two_lines
)
2021-06-08 12:37:10 +02:00
if interface_id_match:
self.__handle_interfaceid_match(interface_id_match, file_name=file_name)
returnvalue_match = re.search(
r"^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)",
two_lines,
2021-06-08 12:37:10 +02:00
)
if returnvalue_match:
2021-06-21 12:52:01 +02:00
self.__handle_returnvalue_match(
name_match=returnvalue_match.group(1),
file_name=file_name,
description="",
2022-06-20 18:02:46 +02:00
number_match=get_number_from_dec_or_hex_str(returnvalue_match.group(2)),
2021-06-21 12:52:01 +02:00
)
2021-06-08 12:37:10 +02:00
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
2022-06-20 18:02:46 +02:00
def __handle_interfaceid_match(self, interface_id_match, file_name: Path) -> bool:
2021-08-02 12:49:10 +02:00
"""Handle a match of an interface ID definition in the code.
Returns whether the interface ID was found successfully in the IF ID header files
"""
if self.get_verbosity() == VerbosityLevels.DEBUG:
_LOGGER.info(
f"Interface ID {interface_id_match.group(1)} found in {file_name}"
)
2021-08-02 12:49:10 +02:00
if_id_entry = self.interfaces.get(interface_id_match.group(1))
if if_id_entry is not None:
self.current_interface_id_entries["ID"] = if_id_entry[0]
else:
_LOGGER.warning(
f"Interface ID {interface_id_match.group(1)} not found in IF ID dictionary"
2021-08-02 12:49:10 +02:00
)
return False
self.current_interface_id_entries["Name"] = self.interfaces[
interface_id_match.group(1)
2022-06-20 18:02:46 +02:00
][1].lstrip()
2021-06-08 12:37:10 +02:00
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
2021-08-02 12:49:10 +02:00
if self.get_verbosity() == VerbosityLevels.DEBUG:
2021-06-08 12:37:10 +02:00
current_id = self.current_interface_id_entries["ID"]
_LOGGER.info(f"Current ID: {current_id}")
2021-08-02 12:49:10 +02:00
return True
2021-06-08 12:37:10 +02:00
def __handle_returnvalue_match(
2022-06-20 18:02:46 +02:00
self, name_match: str, number_match: int, file_name: Path, description: str
2021-06-08 12:37:10 +02:00
):
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"],
name_match,
MAX_STRING_LEN,
PRINT_TRUNCATED_ENTRIES,
2021-06-08 12:37:10 +02:00
)
2022-06-20 18:02:46 +02:00
full_id = (self.current_interface_id_entries["ID"] << 8) | number_match
2021-06-08 12:37:10 +02:00
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
2022-03-04 10:37:30 +01:00
if self.obsw_root_path is not None:
2022-06-20 18:02:46 +02:00
file_name = file_name.relative_to(self.obsw_root_path)
mib_entry = RetvalEntry(
name=string_to_add,
description=description,
unique_id=number_match,
file_name=file_name,
subsystem_name=self.current_interface_id_entries["FullName"],
2021-06-08 12:37:10 +02:00
)
2022-06-20 18:02:46 +02:00
self.return_value_dict.update({full_id: mib_entry})
2021-06-08 12:37:10 +02:00
self.count = self.count + 1
def _post_parsing_operation(self):
if self.print_tables:
PrettyPrinter.pprint(self.return_value_dict)
self.mib_table = self.return_value_dict
@staticmethod
2022-06-21 01:21:01 +02:00
def export_to_csv(filename: Path, list_of_entries: RetvalDictT, column_sep: str):
with open(filename, "w") as out:
out.write(
f"Full ID (hex){column_sep} Name{column_sep} Description{column_sep} "
f"Unique ID{column_sep} Subsytem Name{column_sep} File Path\n"
2021-06-21 12:47:03 +02:00
)
2022-06-21 01:21:01 +02:00
for entry in list_of_entries.items():
if column_sep == ";":
entry[1].description = entry[1].description.replace(";", ",")
elif column_sep == ",":
# Quote the description
entry[1].description = f'"{entry[1].description}"'
out.write(
f"{entry[0]:#06x}{column_sep}{entry[1].name}{column_sep}{entry[1].description}"
f"{column_sep}{entry[1].unique_id}{column_sep}{entry[1].subsystem_name}"
f"{column_sep}{entry[1].file_name.as_posix()}\n"
)
2021-06-08 12:37:10 +02:00
def build_checked_string(
self,
first_part,
second_part,
max_string_len: int,
print_truncated_entries: bool,
):
"""Build a checked string"""
my_str = first_part + "_" + self.convert(second_part)
2021-06-08 12:37:10 +02:00
if len(my_str) > max_string_len:
if print_truncated_entries:
_LOGGER.warning(f"Entry {my_str} too long. Will truncate.")
2021-06-08 12:37:10 +02:00
my_str = my_str[0:max_string_len]
else:
# print("Entry: " + myStr + " is all right.")
pass
return my_str
@staticmethod
def convert(name):
single_strings = name.split("_")
new_string = ""
2021-06-08 12:37:10 +02:00
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(descr_string):
description = descr_string.lstrip("!<- ")
if description == "":
description = " "
2021-06-08 12:37:10 +02:00
return description
2022-06-20 18:02:46 +02:00
def get_number_from_dec_or_hex_str(a_string):
if a_string.startswith("0x"):
2021-06-08 12:37:10 +02:00
return int(a_string, 16)
if a_string.isdigit():
return int(a_string)
_LOGGER.warning(f"Illegal number representation: {a_string}")
2021-06-08 12:37:10 +02:00
return 0