import re import os from typing import List from fsfwgen.parserbase.parser import FileParser from fsfwgen.core import get_console_logger LOGGER = get_console_logger() EVENT_ENTRY_NAME_IDX = 0 EVENT_ENTRY_SEVERITY_IDX = 1 EVENT_ENTRY_INFO_IDX = 2 EVENT_SOURCE_FILE_IDX = 3 FSFW_EVENT_HEADER_INCLUDE = '#include "fsfw/events/Event.h"' DEFAULT_MOVING_WINDOWS_SIZE = 7 SUBSYSTEM_ID_NAMESPACE = "SUBSYSTEM_ID" EVENT_NAME_IDX = 1 class SubsystemDefinitionParser(FileParser): def __init__(self, file_list): super().__init__(file_list) self.moving_window_center_idx = 3 def _handle_file_parsing(self, file_name: str, *args, **kwargs): file = open(file_name, "r") for line in file.readlines(): match = re.search(r"([A-Z0-9_]*) = ([0-9]{1,3})", line) if match: self.mib_table.update({match.group(1): [match.group(2)]}) def _handle_file_parsing_moving_window( self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, *args, **kwargs, ): match = re.search( r"([A-Z0-9_]*) = ([0-9]{1,3})", moving_window[self.moving_window_center_idx] ) if match: self.mib_table.update({match.group(1): [match.group(2)]}) def _post_parsing_operation(self): pass class EventParser(FileParser): def __init__(self, file_list: List[str], interface_list): super().__init__(file_list) self.interfaces = interface_list self.count = 0 self.my_id = 0 self.current_id = 0 self.obsw_root_path = None self.last_lines = ["", "", ""] self.moving_window_center_idx = 3 def _handle_file_parsing(self, file_name: str, *args: any, **kwargs): try: file = open(file_name, "r", encoding="utf-8") all_lines = file.readlines() except UnicodeDecodeError: file = open(file_name, "r", encoding="cp1252") all_lines = file.readlines() total_count = 0 for line in all_lines: self.__handle_line_reading(line, file_name) if self.count > 0: print("File " + file_name + " contained " + str(self.count) + " events.") total_count += self.count self.count = 0 def _handle_file_parsing_moving_window( self, file_name: str, current_line: int, moving_window_size: int, moving_window: list, *args, **kwargs, ): subsystem_id_assignment_match = re.search( rf"([\w]*)[\s]*=[\s]*{SUBSYSTEM_ID_NAMESPACE}::([A-Z_0-9]*);", moving_window[self.moving_window_center_idx], ) if subsystem_id_assignment_match: # For now, it is assumed that there is only going to be one subsystem ID per # class / source file try: self.current_id = self.interfaces[ subsystem_id_assignment_match.group(2) ][0] self.my_id = self.return_number_from_string(self.current_id) except KeyError as e: print(f"Key not found: {e}") # Now try to look for event definitions. Moving windows allows multi line event definitions # These two variants need to be checked event_match = re.match( r"[\s]*static const(?:expr)?[\s]*Event[\s]*([\w]*)[\s]*=([^\n]*)", moving_window[self.moving_window_center_idx], ) macro_api_match = False for idx in range(3): if "MAKE_EVENT" in moving_window[self.moving_window_center_idx + idx]: macro_api_match = True break elif "makeEvent" in moving_window[self.moving_window_center_idx + idx]: break else: event_match = False if event_match: self.__handle_event_match( event_match=event_match, macro_api_match=macro_api_match, moving_window=moving_window, file_name=file_name, ) def __handle_event_match( self, event_match, macro_api_match: bool, moving_window: list, file_name: str ): if ";" in event_match.group(0): event_full_match = self.__generate_regex_event_match( macro_api_match=macro_api_match, full_string=moving_window[self.moving_window_center_idx], ) else: multi_line_string = self.__build_multi_line_event_string( first_line=event_match.group(0), moving_window=moving_window ) event_full_match = self.__generate_regex_event_match( macro_api_match=macro_api_match, full_string=multi_line_string ) description = self._search_for_descrip_string_generic( moving_window=moving_window, break_pattern=r"[\s]*static const(?:expr)?[\s]*Event[\s]*", ) if event_full_match: name = event_match.group(EVENT_NAME_IDX) if macro_api_match: full_id = (self.my_id * 100) + self.return_number_from_string( event_full_match.group(2) ) severity = event_full_match.group(3) else: full_id = (self.my_id * 100) + self.return_number_from_string( event_full_match.group(3) ) severity = event_full_match.group(4) self.mib_table.update({full_id: (name, severity, description, file_name)}) self.count = self.count + 1 @staticmethod def __generate_regex_event_match(macro_api_match: bool, full_string: str): if macro_api_match: # One line event definition. regex_string = ( r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" r"MAKE_EVENT\(([0-9]{1,3}),[\s]*severity::([A-Z]*)\)[\s]*;" ) else: regex_string = ( r"static const(?:expr)? Event[\s]*([\w]*)[\s]*=[\s]*" r"event::makeEvent\(([\w]*),[\s]*([0-9]{1,3})[\s]*,[\s]*severity::([A-Z]*)\)[\s]*;" ) event_full_match = re.search(regex_string, full_string) return event_full_match def __build_multi_line_event_string( self, first_line: str, moving_window: List[str] ) -> str: return self._build_multi_line_string_generic( first_line=first_line, moving_window=moving_window ) def _post_parsing_operation(self): pass def __handle_line_reading(self, line, file_name: str): if not self.last_lines[0] == "\n": twolines = self.last_lines[0] + " " + line.strip() else: twolines = "" match1 = re.search( r"SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);", twolines ) if match1: self.current_id = self.interfaces[match1.group(1)][0] # print( "Current ID: " + str(currentId) ) self.my_id = self.return_number_from_string(self.current_id) match = re.search( r"(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*" r"MAKE_EVENT\(([0-9]{1,2}),[\s]*severity::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)", twolines, ) if match: if match.group(1): self.last_lines[0] = line return description = " " if match.group(6): description = self.clean_up_description(match.group(6)) string_to_add = match.group(2) full_id = (self.my_id * 100) + self.return_number_from_string( match.group(3) ) severity = match.group(4) if full_id in self.mib_table: # print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name + # " was already in " + self.mib_table[full_id][3]) pass if self.obsw_root_path is not None: file_name = os.path.relpath(file_name, self.obsw_root_path) self.mib_table.update( {full_id: (string_to_add, severity, description, file_name)} ) self.count = self.count + 1 self.last_lines[0] = line def build_checked_string(self, first_part, second_part): my_str = first_part + self.convert(second_part) if len(my_str) > 16: print(f"EventParser: Entry: {my_str} too long. Will truncate.") my_str = my_str[0:14] # else: # print( "Entry: " + myStr + " is all right.") return my_str @staticmethod def return_number_from_string(a_string): if a_string.startswith("0x"): return int(a_string, 16) elif a_string.isdigit(): return int(a_string) else: print("EventParser: Illegal number representation: " + a_string) return 0 @staticmethod def convert(name): single_strings = name.split("_") new_string = "" for one_string in single_strings: one_string = one_string.lower() one_string = one_string.capitalize() new_string = new_string + one_string return new_string @staticmethod def clean_up_description(description): description = description.lstrip("//!<>") description = description.lstrip() if description == "": description = " " return description def export_to_file(filename: str, event_list: list, file_separator: str): file = open(filename, "w") for entry in event_list: event_id = entry[0] event_value = entry[1] file.write( str(event_id) + file_separator + event_value[EVENT_ENTRY_NAME_IDX] + file_separator + event_value[EVENT_ENTRY_SEVERITY_IDX] + file_separator + event_value[EVENT_ENTRY_INFO_IDX] + file_separator + event_value[EVENT_SOURCE_FILE_IDX] + "\n" ) file.close() return def write_translation_source_file( event_list: list, date_string: str, filename: str = "translateEvents.cpp" ): outputfile = open(filename, "w") definitions = "" function = ( "const char *translateEvents(Event event) {\n switch((event & 0xFFFF)) {\n" ) for entry in event_list: event_id = entry[0] event_value = entry[1] definitions += ( f"const char *{event_value[EVENT_ENTRY_NAME_IDX]}_STRING " f'= "{event_value[EVENT_ENTRY_NAME_IDX]}";\n' ) function += ( f" case ({event_id}):\n " f"return {event_value[EVENT_ENTRY_NAME_IDX]}_STRING;\n" ) function += ' default:\n return "UNKNOWN_EVENT";\n' outputfile.write( f"/**\n * @brief Auto-generated event translation file. " f"Contains {len(event_list)} translations.\n" f" * @details\n" f" * Generated on: {date_string}\n */\n" ) outputfile.write('#include "translateEvents.h"\n\n') outputfile.write(definitions + "\n" + function + " }\n return 0;\n}\n") outputfile.close() def write_translation_header_file(filename: str = "translateEvents.h"): file = open(filename, "w") file.write( f"#ifndef FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n" f"#define FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_\n\n" f"{FSFW_EVENT_HEADER_INCLUDE}\n\n" f"const char *translateEvents(Event event);\n\n" f"#endif /* FSFWCONFIG_EVENTS_TRANSLATEEVENTS_H_ */\n" ) def handle_csv_export(file_name: str, event_list: list, file_separator: str): """ Generates the CSV in the same directory as the .py file and copes the CSV to another directory if specified. """ export_to_file( filename=file_name, event_list=event_list, file_separator=file_separator ) def handle_cpp_export( event_list: list, date_string: str, file_name: str = "translateEvents.cpp", generate_header: bool = True, header_file_name: str = "translateEvents.h", ): write_translation_source_file( event_list=event_list, date_string=date_string, filename=file_name ) if generate_header: write_translation_header_file(filename=header_file_name)