init repository
This commit is contained in:
0
parserbase/__init__.py
Normal file
0
parserbase/__init__.py
Normal file
71
parserbase/file_list_parser.py
Normal file
71
parserbase/file_list_parser.py
Normal file
@ -0,0 +1,71 @@
|
||||
"""
|
||||
@file file_list_parser.py
|
||||
@brief Generic File Parser class
|
||||
@details
|
||||
Used by parse header files. Implemented as class in case header parser becomes more complex
|
||||
@author R. Mueller
|
||||
@date 22.11.2019
|
||||
"""
|
||||
import os
|
||||
import re
|
||||
from typing import Union
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
class FileListParser:
|
||||
"""
|
||||
Generic header parser which takes a directory name or directory name list
|
||||
and parses all included header files recursively.
|
||||
"""
|
||||
def __init__(self, directory_list_or_name: Union[str, list]):
|
||||
if isinstance(directory_list_or_name, str):
|
||||
self.directory_list = [directory_list_or_name]
|
||||
elif isinstance(directory_list_or_name, list):
|
||||
self.directory_list = directory_list_or_name
|
||||
else:
|
||||
print("Header Parser: Passed directory list is not a header name or list of "
|
||||
"header names")
|
||||
self.header_files = []
|
||||
|
||||
def parse_header_files(self, search_recursively: bool = False,
|
||||
printout_string: str = "Parsing header files: ",
|
||||
print_current_dir: bool = False):
|
||||
"""
|
||||
This function is called to get a list of header files
|
||||
:param search_recursively:
|
||||
:param printout_string:
|
||||
:param print_current_dir:
|
||||
:return:
|
||||
"""
|
||||
print(printout_string, end="")
|
||||
for directory in self.directory_list:
|
||||
self.__get_header_file_list(directory, search_recursively, print_current_dir)
|
||||
print(str(len(self.header_files)) + " header files were found.")
|
||||
# g.PP.pprint(self.header_files)
|
||||
return self.header_files
|
||||
|
||||
def __get_header_file_list(self, base_directory: str, seach_recursively: bool = False,
|
||||
print_current_dir: bool = False):
|
||||
if base_directory[-1] != '/':
|
||||
base_directory += '/'
|
||||
local_header_files = []
|
||||
if print_current_dir:
|
||||
print("Parsing header files in: " + base_directory)
|
||||
base_list = os.listdir(base_directory)
|
||||
# g.PP.pprint(base_list)
|
||||
for entry in base_list:
|
||||
header_file_match = re.match(r"[_.]*.*\.h", entry)
|
||||
if header_file_match:
|
||||
if os.path.isfile(base_directory + entry):
|
||||
match_string = header_file_match.group(0)
|
||||
if match_string[0] == '.' or match_string[0] == '_':
|
||||
pass
|
||||
else:
|
||||
local_header_files.append(base_directory + entry)
|
||||
if seach_recursively:
|
||||
next_path = base_directory + entry
|
||||
if os.path.isdir(next_path):
|
||||
self.__get_header_file_list(next_path, seach_recursively)
|
||||
# print("Files found in: " + base_directory)
|
||||
# g.PP.pprint(local_header_files)
|
||||
self.header_files.extend(local_header_files)
|
197
parserbase/parser.py
Normal file
197
parserbase/parser.py
Normal file
@ -0,0 +1,197 @@
|
||||
#! /usr/bin/python3
|
||||
"""
|
||||
@file mib_packet_content_parser.py
|
||||
@brief Generic File Parser class
|
||||
@details
|
||||
Used by the MIB Exporter. There are multiple functions which are abstract and should
|
||||
be implemented by a custom parser implementation.
|
||||
|
||||
A file list to parse must be supplied.
|
||||
Child classes fill out the MIB table (self.mib_table)
|
||||
@author R. Mueller
|
||||
@date 14.11.2019
|
||||
"""
|
||||
import enum
|
||||
from abc import abstractmethod
|
||||
from typing import Dict
|
||||
|
||||
|
||||
class FileParserModes(enum.Enum):
|
||||
REGULAR = enum.auto(),
|
||||
MOVING_WINDOW = enum.auto()
|
||||
|
||||
|
||||
class FileParser:
|
||||
"""
|
||||
This parent class gathers common file parser operations into a super class.
|
||||
The user should do the following to use this base class:
|
||||
1. Create a custom parser class which implements this class and implement the abstract
|
||||
functions
|
||||
2. Set the parser mode
|
||||
3. Call parse_files. Additional arguments and keyword arguments can be supplied as well and
|
||||
will be passed through to the abstract function implementations.
|
||||
"""
|
||||
def __init__(self, file_list):
|
||||
if len(file_list) == 0:
|
||||
print("File list is empty !")
|
||||
self.file_list_empty = True
|
||||
else:
|
||||
self.file_list_empty = False
|
||||
self.file_list = file_list
|
||||
# Can be used to have unique key in MIB tables
|
||||
self.index = 0
|
||||
# Initialize empty MIB table which will be filled by specific parser implementation
|
||||
self.mib_table = dict()
|
||||
self.__parser_mode = FileParserModes.REGULAR
|
||||
self.__parser_args = 0
|
||||
|
||||
self.__debug_moving_window = False
|
||||
self.__debug_moving_window_filename = ""
|
||||
|
||||
self._verbose_level = 1
|
||||
|
||||
def set_regular_parser_mode(self):
|
||||
"""
|
||||
Set regular parsing mode. This will be the default, so it is not strictly necessary to call
|
||||
this.
|
||||
:return:
|
||||
"""
|
||||
self.__parser_mode = FileParserModes.REGULAR
|
||||
|
||||
def set_moving_window_mode(self, moving_window_size: int):
|
||||
"""
|
||||
Set moving window parsing mode
|
||||
:param moving_window_size:
|
||||
:return:
|
||||
"""
|
||||
self.__parser_mode = FileParserModes.MOVING_WINDOW
|
||||
self.__parser_args = moving_window_size
|
||||
|
||||
def set_verbosity(self, verbose_level: int):
|
||||
self._verbose_level = verbose_level
|
||||
|
||||
def enable_moving_window_debugging(self, file_name: str):
|
||||
self.__debug_moving_window = True
|
||||
self.__debug_moving_window_filename = file_name
|
||||
|
||||
def parse_files(self, *args: any, **kwargs) -> Dict:
|
||||
"""
|
||||
Core method which is called to parse the files
|
||||
:param args: Optional positional arguments. Passed on the file parser
|
||||
:param kwargs: Optional keyword arguments. Passed on to file parser
|
||||
:return: Returns the mib table dictionary.
|
||||
"""
|
||||
if self.file_list_empty:
|
||||
print(f"Nothing to parse, supplied file list is empty!")
|
||||
return self.mib_table
|
||||
|
||||
if self.__parser_mode == FileParserModes.REGULAR:
|
||||
for file_name in self.file_list:
|
||||
# Implemented by child class ! Fill out info table (self.mib_table) in this routine
|
||||
self._handle_file_parsing(file_name, *args, **kwargs)
|
||||
# Can be implemented by child class to edit the table after it is finished.
|
||||
# default implementation is empty
|
||||
self._post_parsing_operation()
|
||||
elif self.__parser_mode == FileParserModes.MOVING_WINDOW:
|
||||
for file_name in self.file_list:
|
||||
self.__parse_file_with_moving_window(file_name, *args, **kwargs)
|
||||
self._post_parsing_operation()
|
||||
return self.mib_table
|
||||
|
||||
@abstractmethod
|
||||
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
|
||||
"""
|
||||
Implemented by child class. The developer should fill the info table (self.mib_table)
|
||||
in this routine
|
||||
:param file_name:
|
||||
:param args: Additional arguments passed through the parse_files method.
|
||||
:param kwargs: Additional keyword arguments passed through the parse_files method.
|
||||
:return: Nothing. Fill out the member dictionary self.mib_table in the function instead.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _handle_file_parsing_moving_window(
|
||||
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
|
||||
*args, **kwargs):
|
||||
"""
|
||||
This will be called for the MOVING_WINDOW parser mode.
|
||||
:param file_name: Current file name
|
||||
:param current_line: Current line number.
|
||||
:param moving_window_size: Size of the moving window
|
||||
:param moving_window: Current moving window. The last entry of the moving window
|
||||
is the current line number
|
||||
:return: Nothing. Fill out the member dictionary self.mib_table in the function instead.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _post_parsing_operation(self):
|
||||
"""
|
||||
# Can be implemented by child class to perform post parsing operations (e.g. setting a
|
||||
flag or editting MIB table entries)
|
||||
:return:
|
||||
"""
|
||||
|
||||
def __parse_file_with_moving_window(self, file_name: str, *args, **kwargs):
|
||||
all_lines = self._open_file(file_name=file_name)
|
||||
moving_window_size = self.__parser_args
|
||||
if moving_window_size == 0:
|
||||
print("Moving window size is 0!")
|
||||
return
|
||||
moving_window = [""] * moving_window_size
|
||||
for line_idx, line in enumerate(all_lines):
|
||||
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
|
||||
print(f"Moving window pre line anaylsis line {line_idx}")
|
||||
print(moving_window)
|
||||
# The moving window will start with only the bottom being in the file
|
||||
if line_idx == 0:
|
||||
moving_window[self.__parser_args - 1] = line
|
||||
# More and more of the window is inside the file now
|
||||
elif line_idx < moving_window_size:
|
||||
for idx in range(line_idx, 0, -1):
|
||||
moving_window[moving_window_size - 1 - idx] = \
|
||||
moving_window[moving_window_size - idx]
|
||||
moving_window[moving_window_size - 1] = line
|
||||
# The full window is inside the file now.
|
||||
elif line_idx >= moving_window_size:
|
||||
for idx in range(moving_window_size - 1):
|
||||
moving_window[idx] = moving_window[idx + 1]
|
||||
moving_window[moving_window_size - 1] = line
|
||||
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
|
||||
print(f"Moving window post line anaylsis line {line_idx}")
|
||||
print(moving_window)
|
||||
self._handle_file_parsing_moving_window(
|
||||
file_name, line_idx, moving_window_size, moving_window, *args, **kwargs
|
||||
)
|
||||
# Now the moving window moved past the end of the file. Sections which are outside
|
||||
# the file are assigned an empty string until the window has moved out of file completely
|
||||
for remaining_windows_idx in range(moving_window_size):
|
||||
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
|
||||
print(f"Moving window pre line analysis post EOF")
|
||||
print(moving_window)
|
||||
num_entries_to_clear = remaining_windows_idx + 1
|
||||
for idx_to_clear in range(num_entries_to_clear):
|
||||
moving_window[moving_window_size - 1 - idx_to_clear] = ""
|
||||
for idx_to_reassign in range(moving_window_size - 1 - num_entries_to_clear):
|
||||
moving_window[idx_to_reassign] = moving_window[idx_to_reassign + 1]
|
||||
if self.__debug_moving_window and self.__debug_moving_window_filename in file_name:
|
||||
print(f"Moving window post line anaylsis post EOF")
|
||||
print(moving_window)
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _open_file(file_name: str) -> list:
|
||||
"""
|
||||
Open a file, attempting common encodings utf-8 and cp1252
|
||||
:param file_name:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
file = open(file_name, 'r', encoding='utf-8')
|
||||
all_lines = file.readlines()
|
||||
except UnicodeDecodeError:
|
||||
print("ReturnValueParser: Decoding error with file " + file_name)
|
||||
file = open(file_name, 'r', encoding='cp1252')
|
||||
all_lines = file.readlines()
|
||||
return all_lines
|
Reference in New Issue
Block a user