fsfwgen/fsfwgen/events/event_parser.py

365 lines
13 KiB
Python

import re
import os
from typing import List
from fsfwgen.parserbase.parser import FileParser
from fsfwgen.core import get_console_logger
LOGGER = get_console_logger()
EVENT_ENTRY_NAME_IDX = 0
EVENT_ENTRY_SEVERITY_IDX = 1
EVENT_ENTRY_INFO_IDX = 2
EVENT_SOURCE_FILE_IDX = 3
FSFW_EVENT_HEADER_INCLUDE = '#include "fsfw/events/Event.h"'
DEFAULT_MOVING_WINDOWS_SIZE = 7
SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID"
EVENT_NAME_IDX = 1
class SubsystemDefinitionParser(FileParser):
def __init__(self, file_list):
super().__init__(file_list)
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r")
for line in file.readlines():
match = re.search(r"([A-Z0-9_]*) = ([0-9]{1,3})", line)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _handle_file_parsing_moving_window(
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
match = re.search(
r"([A-Z0-9_]*) = ([0-9]{1,3})", moving_window[self.moving_window_center_idx]
)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _post_parsing_operation(self):
pass
class EventParser(FileParser):
def __init__(self, file_list: List[str], interface_list):
super().__init__(file_list)
self.interfaces = interface_list
self.count = 0
self.my_id = 0
self.current_id = 0
self.obsw_root_path = None
self.last_lines = ["", "", ""]
self.moving_window_center_idx = 3
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
try:
file = open(file_name, "r", encoding="utf-8")
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, "r", encoding="cp1252")
all_lines = file.readlines()
total_count = 0
for line in all_lines:
self.__handle_line_reading(line, file_name)
if self.count > 0:
print("File " + file_name + " contained " + str(self.count) + " events.")
total_count += self.count
self.count = 0
def _handle_file_parsing_moving_window(
self,
file_name: str,
current_line: int,
moving_window_size: int,
moving_window: list,
*args,
**kwargs,
):
subsystem_id_assignment_match = re.search(
rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);",
moving_window[self.moving_window_center_idx],
)
if subsystem_id_assignment_match:
# For now, it is assumed that there is only going to be one subsystem ID per
# class / source file
try:
self.current_id = self.interfaces[
subsystem_id_assignment_match.group(2)
][0]
self.my_id = self.return_number_from_string(self.current_id)
except KeyError as e:
print(f"Key not found: {e}")
# Now try to look for event definitions. Moving windows allows multi line event definitions
# These two variants need to be checked
event_match = re.match(
r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=([^\n]*)",
moving_window[self.moving_window_center_idx],
)
macro_api_match = False
if event_match is not None:
valid_event = False
for idx in range(3):
if "MAKE_EVENT" in moving_window[self.moving_window_center_idx + idx]:
macro_api_match = True
valid_event = True
break
elif "makeEvent" in moving_window[self.moving_window_center_idx + idx]:
valid_event = True
break
if not valid_event:
event_match = False
if event_match:
self.__handle_event_match(
event_match=event_match,
macro_api_match=macro_api_match,
moving_window=moving_window,
file_name=file_name,
)
def __handle_event_match(
self, event_match, macro_api_match: bool, moving_window: list, file_name: str
):
if ";" in event_match.group(0):
event_full_match = self.__generate_regex_event_match(
macro_api_match=macro_api_match,
full_string=moving_window[self.moving_window_center_idx],
)
else:
multi_line_string = self.__build_multi_line_event_string(
first_line=event_match.group(0), moving_window=moving_window
)
event_full_match = self.__generate_regex_event_match(
macro_api_match=macro_api_match, full_string=multi_line_string
)
description = self._search_for_descrip_string_generic(
moving_window=moving_window,
break_pattern=r"[\s]*static const(?:expr)?[\s]*Event[\s]*",
)
if event_full_match:
name = event_match.group(EVENT_NAME_IDX)
if macro_api_match:
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(2)
)
severity = event_full_match.group(3)
else:
full_id = (self.my_id * 100) + self.return_number_from_string(
event_full_match.group(3)
)
severity = event_full_match.group(4)
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
if self.mib_table.get(full_id) is not None:
LOGGER.warning(f"Duplicate event ID {full_id} detected")
LOGGER.info(
f"Name: {self.mib_table.get(full_id)[0]}| "
f"Description: {self.mib_table.get(full_id)[2]}"
)
self.mib_table.update({full_id: (name, severity, description, file_name)})
self.count = self.count + 1
@staticmethod
def __generate_regex_event_match(macro_api_match: bool, full_string: str):
if macro_api_match:
# One line event definition.
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;"
)
else:
regex_string = (
r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*"
r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;"
)
event_full_match = re.search(regex_string, full_string)
return event_full_match
def __build_multi_line_event_string(
self, first_line: str, moving_window: List[str]
) -> str:
return self._build_multi_line_string_generic(
first_line=first_line, moving_window=moving_window
)
def _post_parsing_operation(self):
pass
def __handle_line_reading(self, line, file_name: str):
if not self.last_lines[0] == "\n":
twolines = self.last_lines[0] + " " + line.strip()
else:
twolines = ""
match1 = re.search(
r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines
)
if match1:
self.current_id = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.my_id = self.return_number_from_string(self.current_id)
match = re.search(
r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*"
r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)",
twolines,
)
if match:
if match.group(1):
self.last_lines[0] = line
return
description = " "
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.my_id * 100) + self.return_number_from_string(
match.group(3)
)
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
if self.obsw_root_path is not None:
file_name = os.path.relpath(file_name, self.obsw_root_path)
self.mib_table.update(
{full_id: (string_to_add, severity, description, file_name)}
)
self.count = self.count + 1
self.last_lines[0] = line
def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part)
if len(my_str) > 16:
print(f"EventParser: Entry: {my_str} too long. Will truncate.")
my_str = my_str[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return my_str
@staticmethod
def return_number_from_string(a_string):
if a_string.startswith("0x"):
return int(a_string, 16)
elif a_string.isdigit():
return int(a_string)
else:
print("EventParser: Illegal number representation: " + a_string)
return 0
@staticmethod
def convert(name):
single_strings = name.split("_")
new_string = ""
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(description):
description = description.lstrip("//!<>")
description = description.lstrip()
if description == "":
description = " "
return description
def export_to_file(filename: str, event_list: list, file_separator: str):
file = open(filename, "w")
for entry in event_list:
event_id = int(entry[0])
event_value = entry[1]
event_id_as_hex = f"{event_id:#06x}"
file.write(
str(event_id)
+ file_separator
+ event_id_as_hex
+ file_separator
+ event_value[EVENT_ENTRY_NAME_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_SEVERITY_IDX]
+ file_separator
+ event_value[EVENT_ENTRY_INFO_IDX]
+ file_separator
+ event_value[EVENT_SOURCE_FILE_IDX]
+ "\n"
)
file.close()
return
def write_translation_source_file(
event_list: list, date_string: str, filename: str = "translateEvents.cpp"
):
outputfile = open(filename, "w")
definitions = ""
function = (
"const char *translateEvents(Event event) {\n switch ((event & 0xFFFF)) {\n"
)
for entry in event_list:
event_id = entry[0]
event_value = entry[1]
definitions += (
f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING "
f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n'
)
function += (
f" case ({event_id}):\n "
f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n"
)
function += ' default:\n return "UNKNOWN_EVENT";\n'
outputfile.write(
f"/**\n * @brief Auto-generated event translation file. "
f"Contains {len(event_list)} translations.\n"
f" * @details\n"
f" * Generated on: {date_string}\n */\n"
)
outputfile.write('#include "translateEvents.h"\n\n')
outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n")
outputfile.close()
def write_translation_header_file(filename: str = "translateEvents.h"):
file = open(filename, "w")
file.write(
f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n"
f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n"
f"{FSFW_EVENT_HEADER_INCLUDE}\n\n"
f"const char *translateEvents(Event event);\n\n"
f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n"
)
def handle_csv_export(file_name: str, event_list: list, file_separator: str):
"""
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
export_to_file(
filename=file_name, event_list=event_list, file_separator=file_separator
)
def handle_cpp_export(
event_list: list,
date_string: str,
file_name: str = "translateEvents.cpp",
generate_header: bool = True,
header_file_name: str = "translateEvents.h",
):
write_translation_source_file(
event_list=event_list, date_string=date_string, filename=file_name
)
if generate_header:
write_translation_header_file(filename=header_file_name)