Refactor Path handling
This commit is contained in:
parent
6d423f7106
commit
fc191cc50e
@ -1,6 +1,7 @@
|
|||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
from typing import List
|
from pathlib import Path
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
from fsfwgen.parserbase.parser import FileParser
|
from fsfwgen.parserbase.parser import FileParser
|
||||||
from fsfwgen.core import get_console_logger
|
from fsfwgen.core import get_console_logger
|
||||||
@ -50,17 +51,17 @@ class SubsystemDefinitionParser(FileParser):
|
|||||||
|
|
||||||
|
|
||||||
class EventParser(FileParser):
|
class EventParser(FileParser):
|
||||||
def __init__(self, file_list: List[str], interface_list):
|
def __init__(self, file_list: List[Path], interface_list):
|
||||||
super().__init__(file_list)
|
super().__init__(file_list)
|
||||||
self.interfaces = interface_list
|
self.interfaces = interface_list
|
||||||
self.count = 0
|
self.count = 0
|
||||||
self.my_id = 0
|
self.my_id = 0
|
||||||
self.current_id = 0
|
self.current_id = 0
|
||||||
self.obsw_root_path = None
|
self.obsw_root_path: Optional[Path] = None
|
||||||
self.last_lines = ["", "", ""]
|
self.last_lines = ["", "", ""]
|
||||||
self.moving_window_center_idx = 3
|
self.moving_window_center_idx = 3
|
||||||
|
|
||||||
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
|
def _handle_file_parsing(self, file_name: Path, *args: any, **kwargs):
|
||||||
try:
|
try:
|
||||||
file = open(file_name, "r", encoding="utf-8")
|
file = open(file_name, "r", encoding="utf-8")
|
||||||
all_lines = file.readlines()
|
all_lines = file.readlines()
|
||||||
@ -71,13 +72,13 @@ class EventParser(FileParser):
|
|||||||
for line in all_lines:
|
for line in all_lines:
|
||||||
self.__handle_line_reading(line, file_name)
|
self.__handle_line_reading(line, file_name)
|
||||||
if self.count > 0:
|
if self.count > 0:
|
||||||
print("File " + file_name + " contained " + str(self.count) + " events.")
|
print(f"File {file_name} contained {self.count} events.")
|
||||||
total_count += self.count
|
total_count += self.count
|
||||||
self.count = 0
|
self.count = 0
|
||||||
|
|
||||||
def _handle_file_parsing_moving_window(
|
def _handle_file_parsing_moving_window(
|
||||||
self,
|
self,
|
||||||
file_name: str,
|
file_name: Path,
|
||||||
current_line: int,
|
current_line: int,
|
||||||
moving_window_size: int,
|
moving_window_size: int,
|
||||||
moving_window: list,
|
moving_window: list,
|
||||||
@ -127,7 +128,7 @@ class EventParser(FileParser):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def __handle_event_match(
|
def __handle_event_match(
|
||||||
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
|
self, event_match, macro_api_match: bool, moving_window: list, file_name: Path
|
||||||
):
|
):
|
||||||
if ";" in event_match.group(0):
|
if ";" in event_match.group(0):
|
||||||
event_full_match = self.__generate_regex_event_match(
|
event_full_match = self.__generate_regex_event_match(
|
||||||
@ -165,7 +166,9 @@ class EventParser(FileParser):
|
|||||||
f"Name: {self.mib_table.get(full_id)[0]}| "
|
f"Name: {self.mib_table.get(full_id)[0]}| "
|
||||||
f"Description: {self.mib_table.get(full_id)[2]}"
|
f"Description: {self.mib_table.get(full_id)[2]}"
|
||||||
)
|
)
|
||||||
self.mib_table.update({full_id: (name, severity, description, file_name)})
|
self.mib_table.update(
|
||||||
|
{full_id: (name, severity, description, file_name.as_posix())}
|
||||||
|
)
|
||||||
self.count = self.count + 1
|
self.count = self.count + 1
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -194,7 +197,7 @@ class EventParser(FileParser):
|
|||||||
def _post_parsing_operation(self):
|
def _post_parsing_operation(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __handle_line_reading(self, line, file_name: str):
|
def __handle_line_reading(self, line, file_name: Path):
|
||||||
if not self.last_lines[0] == "\n":
|
if not self.last_lines[0] == "\n":
|
||||||
twolines = self.last_lines[0] + " " + line.strip()
|
twolines = self.last_lines[0] + " " + line.strip()
|
||||||
else:
|
else:
|
||||||
@ -228,9 +231,7 @@ class EventParser(FileParser):
|
|||||||
# " was already in " + self.mib_table[full_id][3])
|
# " was already in " + self.mib_table[full_id][3])
|
||||||
pass
|
pass
|
||||||
if self.obsw_root_path is not None:
|
if self.obsw_root_path is not None:
|
||||||
file_name = os.path.relpath(file_name, self.obsw_root_path)
|
file_name = file_name.relative_to(self.obsw_root_path)
|
||||||
# Replace backslashes with regular slashes
|
|
||||||
file_name.replace("\\", "/")
|
|
||||||
self.mib_table.update(
|
self.mib_table.update(
|
||||||
{full_id: (string_to_add, severity, description, file_name)}
|
{full_id: (string_to_add, severity, description, file_name)}
|
||||||
)
|
)
|
||||||
@ -275,25 +276,19 @@ class EventParser(FileParser):
|
|||||||
return description
|
return description
|
||||||
|
|
||||||
|
|
||||||
def export_to_file(filename: str, event_list: list, file_separator: str):
|
def export_to_file(
|
||||||
|
filename: str, event_list: List[Tuple[str, List]], file_separator: str
|
||||||
|
):
|
||||||
file = open(filename, "w")
|
file = open(filename, "w")
|
||||||
|
fsep = file_separator
|
||||||
for entry in event_list:
|
for entry in event_list:
|
||||||
event_id = int(entry[0])
|
event_id = int(entry[0])
|
||||||
event_value = entry[1]
|
event_value = entry[1]
|
||||||
event_id_as_hex = f"{event_id:#06x}"
|
event_id_as_hex = f"{event_id:#06x}"
|
||||||
file.write(
|
file.write(
|
||||||
str(event_id)
|
f"{event_id}{fsep}{event_id_as_hex}{fsep}{event_value[EVENT_ENTRY_NAME_IDX]}{fsep}"
|
||||||
+ file_separator
|
f"{event_value[EVENT_ENTRY_SEVERITY_IDX]}{fsep}{event_value[EVENT_ENTRY_INFO_IDX]}"
|
||||||
+ event_id_as_hex
|
f"{fsep}{event_value[EVENT_SOURCE_FILE_IDX]}\n"
|
||||||
+ file_separator
|
|
||||||
+ event_value[EVENT_ENTRY_NAME_IDX]
|
|
||||||
+ file_separator
|
|
||||||
+ event_value[EVENT_ENTRY_SEVERITY_IDX]
|
|
||||||
+ file_separator
|
|
||||||
+ event_value[EVENT_ENTRY_INFO_IDX]
|
|
||||||
+ file_separator
|
|
||||||
+ event_value[EVENT_SOURCE_FILE_IDX]
|
|
||||||
+ "\n"
|
|
||||||
)
|
)
|
||||||
file.close()
|
file.close()
|
||||||
return
|
return
|
||||||
|
@ -3,7 +3,8 @@ Used by parse header files. Implemented as class in case header parser becomes m
|
|||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from typing import Union
|
from pathlib import Path
|
||||||
|
from typing import Union, List
|
||||||
|
|
||||||
from fsfwgen.core import get_console_logger
|
from fsfwgen.core import get_console_logger
|
||||||
|
|
||||||
@ -17,11 +18,11 @@ class FileListParser:
|
|||||||
TODO: Filter functionality for each directory to filter out files or folders
|
TODO: Filter functionality for each directory to filter out files or folders
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, directory_list_or_name: Union[str, list]):
|
def __init__(self, directory_list_or_name: Union[Path, List[Path]]):
|
||||||
self.directory_list = []
|
self.directory_list = []
|
||||||
if isinstance(directory_list_or_name, str):
|
if isinstance(directory_list_or_name, Path):
|
||||||
self.directory_list.append(directory_list_or_name)
|
self.directory_list.append(directory_list_or_name)
|
||||||
elif isinstance(directory_list_or_name, list):
|
elif isinstance(directory_list_or_name, List):
|
||||||
self.directory_list.extend(directory_list_or_name)
|
self.directory_list.extend(directory_list_or_name)
|
||||||
else:
|
else:
|
||||||
LOGGER.warning(
|
LOGGER.warning(
|
||||||
@ -34,7 +35,7 @@ class FileListParser:
|
|||||||
search_recursively: bool = False,
|
search_recursively: bool = False,
|
||||||
printout_string: str = "Parsing header files: ",
|
printout_string: str = "Parsing header files: ",
|
||||||
print_current_dir: bool = False,
|
print_current_dir: bool = False,
|
||||||
):
|
) -> List[Path]:
|
||||||
"""This function is called to get a list of header files
|
"""This function is called to get a list of header files
|
||||||
:param search_recursively:
|
:param search_recursively:
|
||||||
:param printout_string:
|
:param printout_string:
|
||||||
@ -52,30 +53,25 @@ class FileListParser:
|
|||||||
|
|
||||||
def __get_header_file_list(
|
def __get_header_file_list(
|
||||||
self,
|
self,
|
||||||
base_directory: str,
|
base_directory: Path,
|
||||||
seach_recursively: bool = False,
|
seach_recursively: bool = False,
|
||||||
print_current_dir: bool = False,
|
print_current_dir: bool = False,
|
||||||
):
|
):
|
||||||
if base_directory[-1] != "/":
|
|
||||||
base_directory += "/"
|
|
||||||
local_header_files = []
|
local_header_files = []
|
||||||
if print_current_dir:
|
if print_current_dir:
|
||||||
print("Parsing header files in: " + base_directory)
|
print(f"Parsing header files in: {base_directory}")
|
||||||
base_list = os.listdir(base_directory)
|
|
||||||
# g.PP.pprint(base_list)
|
# g.PP.pprint(base_list)
|
||||||
for entry in base_list:
|
for entry in base_directory.iterdir():
|
||||||
header_file_match = re.match(r"[_.]*.*\.h", entry)
|
# header_file_match = re.match(r"[_.]*.*\.h", entry.as_posix())
|
||||||
if header_file_match:
|
if (
|
||||||
if os.path.isfile(base_directory + entry):
|
entry.is_file()
|
||||||
match_string = header_file_match.group(0)
|
and entry.suffix == ".h"
|
||||||
if match_string[0] == "." or match_string[0] == "_":
|
and entry.as_posix()[0] not in [".", "_"]
|
||||||
pass
|
):
|
||||||
else:
|
local_header_files.append(entry)
|
||||||
local_header_files.append(base_directory + entry)
|
|
||||||
if seach_recursively:
|
if seach_recursively:
|
||||||
next_path = base_directory + entry
|
if entry.is_dir():
|
||||||
if os.path.isdir(next_path):
|
self.__get_header_file_list(entry, seach_recursively)
|
||||||
self.__get_header_file_list(next_path, seach_recursively)
|
|
||||||
# print("Files found in: " + base_directory)
|
# print("Files found in: " + base_directory)
|
||||||
# g.PP.pprint(local_header_files)
|
# g.PP.pprint(local_header_files)
|
||||||
self.header_files.extend(local_header_files)
|
self.header_files.extend(local_header_files)
|
||||||
|
@ -14,6 +14,7 @@ Child classes fill out the MIB table (self.mib_table)
|
|||||||
import enum
|
import enum
|
||||||
import re
|
import re
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
|
from pathlib import Path
|
||||||
from typing import Dict, List
|
from typing import Dict, List
|
||||||
|
|
||||||
|
|
||||||
@ -24,8 +25,8 @@ class VerbosityLevels(enum.Enum):
|
|||||||
|
|
||||||
|
|
||||||
class FileParserModes(enum.Enum):
|
class FileParserModes(enum.Enum):
|
||||||
REGULAR = (enum.auto(),)
|
REGULAR = enum.auto
|
||||||
MOVING_WINDOW = enum.auto()
|
MOVING_WINDOW = enum.auto
|
||||||
|
|
||||||
|
|
||||||
class FileParser:
|
class FileParser:
|
||||||
@ -39,7 +40,7 @@ class FileParser:
|
|||||||
will be passed through to the abstract function implementations.
|
will be passed through to the abstract function implementations.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, file_list):
|
def __init__(self, file_list: List[Path]):
|
||||||
if len(file_list) == 0:
|
if len(file_list) == 0:
|
||||||
print("File list is empty !")
|
print("File list is empty !")
|
||||||
self.file_list_empty = True
|
self.file_list_empty = True
|
||||||
@ -111,7 +112,7 @@ class FileParser:
|
|||||||
return self.mib_table
|
return self.mib_table
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
|
def _handle_file_parsing(self, file_name: Path, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
Implemented by child class. The developer should fill the info table (self.mib_table)
|
Implemented by child class. The developer should fill the info table (self.mib_table)
|
||||||
in this routine
|
in this routine
|
||||||
@ -125,7 +126,7 @@ class FileParser:
|
|||||||
@abstractmethod
|
@abstractmethod
|
||||||
def _handle_file_parsing_moving_window(
|
def _handle_file_parsing_moving_window(
|
||||||
self,
|
self,
|
||||||
file_name: str,
|
file_name: Path,
|
||||||
current_line: int,
|
current_line: int,
|
||||||
moving_window_size: int,
|
moving_window_size: int,
|
||||||
moving_window: list,
|
moving_window: list,
|
||||||
@ -151,7 +152,7 @@ class FileParser:
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __parse_file_with_moving_window(self, file_name: str, *args, **kwargs):
|
def __parse_file_with_moving_window(self, file_name: Path, *args, **kwargs):
|
||||||
all_lines = self._open_file(file_name=file_name)
|
all_lines = self._open_file(file_name=file_name)
|
||||||
moving_window_size = self.__parser_args
|
moving_window_size = self.__parser_args
|
||||||
if moving_window_size == 0:
|
if moving_window_size == 0:
|
||||||
@ -212,7 +213,7 @@ class FileParser:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _open_file(file_name: str) -> list:
|
def _open_file(file_name: Path) -> list:
|
||||||
"""
|
"""
|
||||||
Open a file, attempting common encodings utf-8 and cp1252
|
Open a file, attempting common encodings utf-8 and cp1252
|
||||||
:param file_name:
|
:param file_name:
|
||||||
@ -222,7 +223,7 @@ class FileParser:
|
|||||||
file = open(file_name, "r", encoding="utf-8")
|
file = open(file_name, "r", encoding="utf-8")
|
||||||
all_lines = file.readlines()
|
all_lines = file.readlines()
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
print("ReturnValueParser: Decoding error with file " + file_name)
|
print(f"Parser: Decoding error with file {file_name}")
|
||||||
file = open(file_name, "r", encoding="cp1252")
|
file = open(file_name, "r", encoding="cp1252")
|
||||||
all_lines = file.readlines()
|
all_lines = file.readlines()
|
||||||
return all_lines
|
return all_lines
|
||||||
|
Loading…
x
Reference in New Issue
Block a user