268 lines
11 KiB
Python
268 lines
11 KiB
Python
import re
|
|
|
|
from fsfwgen.parserbase.parser import FileParser
|
|
|
|
EVENT_ENTRY_NAME_IDX = 0
|
|
EVENT_ENTRY_SEVERITY_IDX = 1
|
|
EVENT_ENTRY_INFO_IDX = 2
|
|
EVENT_SOURCE_FILE_IDX = 3
|
|
FSFW_EVENT_HEADER_INCLUDE = "#include <fsfw/events/Event.h>"
|
|
DEFAULT_MOVING_WINDOWS_SIZE = 7
|
|
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
|
|
|
|
EVENT_NAME_IDX = 1
|
|
|
|
|
|
class SubsystemDefinitionParser(FileParser):
|
|
def __init__(self, file_list):
|
|
super().__init__(file_list)
|
|
self.moving_window_center_idx = 3
|
|
|
|
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
|
|
file = open(file_name, "r")
|
|
for line in file.readlines():
|
|
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', line)
|
|
if match:
|
|
self.mib_table.update({match.group(1): [match.group(2)]})
|
|
|
|
def _handle_file_parsing_moving_window(
|
|
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
|
|
*args, **kwargs):
|
|
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,3})', moving_window[self.moving_window_center_idx])
|
|
if match:
|
|
self.mib_table.update({match.group(1): [match.group(2)]})
|
|
|
|
def _post_parsing_operation(self):
|
|
pass
|
|
|
|
|
|
class EventParser(FileParser):
|
|
def __init__(self, file_list, interface_list):
|
|
super().__init__(file_list)
|
|
self.interfaces = interface_list
|
|
self.count = 0
|
|
self.my_id = 0
|
|
self.current_id = 0
|
|
self.last_lines = ["", "", ""]
|
|
self.moving_window_center_idx = 3
|
|
|
|
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
|
|
try:
|
|
file = open(file_name, 'r', encoding='utf-8')
|
|
all_lines = file.readlines()
|
|
except UnicodeDecodeError:
|
|
file = open(file_name, 'r', encoding='cp1252')
|
|
all_lines = file.readlines()
|
|
total_count = 0
|
|
for line in all_lines:
|
|
self.__handle_line_reading(line, file_name)
|
|
if self.count > 0:
|
|
print("File " + file_name + " contained " + str(self.count) + " events.")
|
|
total_count += self.count
|
|
self.count = 0
|
|
|
|
def _handle_file_parsing_moving_window(
|
|
self, file_name: str, current_line: int, moving_window_size: int, moving_window: list,
|
|
*args, **kwargs):
|
|
subsystem_id_assignment_match = re.search(
|
|
rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);", moving_window[self.moving_window_center_idx]
|
|
)
|
|
if subsystem_id_assignment_match:
|
|
# For now, it is assumed that there is only going to be one subsystem ID per class / source file
|
|
# if "SUBSYSTEM_ID" in subsystem_id_assignment_match.group(1):
|
|
try:
|
|
self.current_id = self.interfaces[subsystem_id_assignment_match.group(2)][0]
|
|
self.my_id = self.return_number_from_string(self.current_id)
|
|
except KeyError as e:
|
|
print(f"Key not found: {e}")
|
|
|
|
# Now try to look for event definitions. Moving windows allows multi line event definitions
|
|
# These two variants need to be checked
|
|
event_match = re.match(
|
|
r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*event::makeEvent[^\n]*",
|
|
moving_window[self.moving_window_center_idx]
|
|
)
|
|
macro_api_match = False
|
|
if not event_match:
|
|
event_match = re.match(
|
|
r"[\s]*static[\s]*const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=[\s]*MAKE_EVENT[^\n]*",
|
|
moving_window[self.moving_window_center_idx]
|
|
)
|
|
if event_match:
|
|
macro_api_match = True
|
|
if event_match:
|
|
self.__handle_event_match(
|
|
event_match=event_match, macro_api_match=macro_api_match, moving_window=moving_window,
|
|
file_name=file_name
|
|
)
|
|
|
|
def __handle_event_match(self, event_match, macro_api_match: bool, moving_window: list, file_name: str):
|
|
event_full_match = False
|
|
if ";" in event_match.group(0):
|
|
event_full_match = self.__handle_one_line_event_match(
|
|
macro_api_match=macro_api_match, moving_window=moving_window
|
|
)
|
|
# Description will be parsed separately later
|
|
description = " "
|
|
if event_full_match:
|
|
name = event_match.group(EVENT_NAME_IDX)
|
|
if macro_api_match:
|
|
full_id = (self.my_id * 100) + self.return_number_from_string(event_full_match.group(2))
|
|
severity = event_full_match.group(3)
|
|
else:
|
|
full_id = (self.my_id * 100) + self.return_number_from_string(event_full_match.group(3))
|
|
severity = event_full_match.group(4)
|
|
self.mib_table.update({full_id: (name, severity, description, file_name)})
|
|
self.count = self.count + 1
|
|
|
|
def __handle_one_line_event_match(self, macro_api_match: bool, moving_window: list):
|
|
if macro_api_match:
|
|
# One line event definition.
|
|
regex_string = \
|
|
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \
|
|
r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
|
|
else:
|
|
regex_string = \
|
|
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" \
|
|
r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;"
|
|
event_full_match = re.search(regex_string, moving_window[self.moving_window_center_idx])
|
|
return event_full_match
|
|
|
|
def _post_parsing_operation(self):
|
|
pass
|
|
|
|
def __handle_line_reading(self, line, file_name):
|
|
if not self.last_lines[0] == '\n':
|
|
twolines = self.last_lines[0] + ' ' + line.strip()
|
|
else:
|
|
twolines = ''
|
|
match1 = re.search(r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines)
|
|
if match1:
|
|
self.current_id = self.interfaces[match1.group(1)][0]
|
|
# print( "Current ID: " + str(currentId) )
|
|
self.my_id = self.return_number_from_string(self.current_id)
|
|
match = re.search(
|
|
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
|
|
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", twolines
|
|
)
|
|
if match:
|
|
if match.group(1):
|
|
self.last_lines[0] = line
|
|
return
|
|
description = " "
|
|
if match.group(6):
|
|
description = self.clean_up_description(match.group(6))
|
|
string_to_add = match.group(2)
|
|
full_id = (self.my_id * 100) + self.return_number_from_string(match.group(3))
|
|
severity = match.group(4)
|
|
if full_id in self.mib_table:
|
|
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
|
|
# " was already in " + self.mib_table[full_id][3])
|
|
pass
|
|
self.mib_table.update({full_id: (string_to_add, severity, description, file_name)})
|
|
self.count = self.count + 1
|
|
self.last_lines[0] = line
|
|
|
|
def build_checked_string(self, first_part, second_part):
|
|
my_str = first_part + self.convert(second_part)
|
|
if len(my_str) > 16:
|
|
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
|
|
my_str = my_str[0:14]
|
|
# else:
|
|
# print( "Entry: " + myStr + " is all right.")
|
|
return my_str
|
|
|
|
@staticmethod
|
|
def return_number_from_string(a_string):
|
|
if a_string.startswith('0x'):
|
|
return int(a_string, 16)
|
|
elif a_string.isdigit():
|
|
return int(a_string)
|
|
else:
|
|
print('EventParser: Illegal number representation: ' + a_string)
|
|
return 0
|
|
|
|
@staticmethod
|
|
def convert(name):
|
|
single_strings = name.split('_')
|
|
new_string = ''
|
|
for one_string in single_strings:
|
|
one_string = one_string.lower()
|
|
one_string = one_string.capitalize()
|
|
new_string = new_string + one_string
|
|
return new_string
|
|
|
|
@staticmethod
|
|
def clean_up_description(description):
|
|
description = description.lstrip('//!<>')
|
|
description = description.lstrip()
|
|
if description == '':
|
|
description = ' '
|
|
return description
|
|
|
|
|
|
def export_to_file(filename: str, event_list: list, file_separator: str):
|
|
file = open(filename, "w")
|
|
for entry in event_list:
|
|
event_id = entry[0]
|
|
event_value = entry[1]
|
|
file.write(
|
|
str(event_id) + file_separator + event_value[EVENT_ENTRY_NAME_IDX] + file_separator +
|
|
event_value[EVENT_ENTRY_SEVERITY_IDX] + file_separator + event_value[EVENT_ENTRY_INFO_IDX] +
|
|
file_separator + event_value[EVENT_SOURCE_FILE_IDX] + '\n'
|
|
)
|
|
file.close()
|
|
return
|
|
|
|
|
|
def write_translation_source_file(event_list: list, date_string: str, filename: str = "translateEvents.cpp"):
|
|
outputfile = open(filename, "w")
|
|
definitions = ""
|
|
|
|
function = "const char * translateEvents(Event event) {\n\tswitch( (event & 0xffff) ) {\n"
|
|
for entry in event_list:
|
|
event_id = entry[0]
|
|
event_value = entry[1]
|
|
definitions += \
|
|
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING = \"{event_value[EVENT_ENTRY_NAME_IDX]}\";\n"
|
|
function += f"\tcase({event_id}):\n\t\treturn {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
|
|
function += '\tdefault:\n\t\treturn "UNKNOWN_EVENT";\n'
|
|
outputfile.write(
|
|
f"/**\n * @brief Auto-generated event translation file. Contains {len(event_list)} translations.\n"
|
|
f" * @details\n"
|
|
f" * Generated on: {date_string}\n */\n"
|
|
)
|
|
outputfile.write("#include \"translateEvents.h\"\n\n")
|
|
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
|
|
outputfile.close()
|
|
|
|
|
|
def write_translation_header_file(filename: str = "translateEvents.h"):
|
|
file = open(filename, "w")
|
|
file.write(
|
|
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
|
|
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
|
|
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
|
|
f"const char * translateEvents(Event event);\n\n"
|
|
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
|
|
)
|
|
|
|
|
|
def handle_csv_export(file_name: str, event_list: list, file_separator: str):
|
|
"""
|
|
Generates the CSV in the same directory as the .py file and copes the CSV to another
|
|
directory if specified.
|
|
"""
|
|
print("EventParser: Exporting to file: " + file_name)
|
|
export_to_file(filename=file_name, event_list=event_list, file_separator=file_separator)
|
|
|
|
|
|
def handle_cpp_export(
|
|
event_list: list, date_string: str, file_name: str = "translateEvents.cpp", generate_header: bool = True,
|
|
header_file_name: str = "translateEvents.h"
|
|
):
|
|
print("EventParser: Generating translation cpp file.")
|
|
write_translation_source_file(event_list=event_list, date_string=date_string, filename=file_name)
|
|
if generate_header:
|
|
write_translation_header_file(filename=header_file_name)
|