Init commit

This commit is contained in:
2021-04-27 17:22:34 +02:00
commit 4f6fe6959f
1140 changed files with 1174277 additions and 0 deletions

View File

View File

@ -0,0 +1,215 @@
#! /usr/bin/python3.7
"""
@file
mib_objects.py
@brief
Part of the Mission Information Base Exporter for the SOURCE project by KSat.
@details
Event exporter.
To use MySQLdb, run pip install mysqlclient or install in IDE.
On Windows, Build Tools installation might be necessary
@data
21.11.2019
"""
import re
import os
import shutil
import pprint
from utility.mib_csv_writer import CsvWriter
generateCpp = True
generateCsv = True
copyCppFile = True
moveCsvFile = True
cppFilename = "translateEvents.cpp"
csvFilename = "MIB_Events.csv"
cppCopyDestination = "../../config/objects/"
csvMoveDestination = "../"
fileSeparator = ";"
pp = pprint.PrettyPrinter(indent=0, width=250)
def main():
print("Parsing events: ")
listItems = parseOBSW()
handleFileExport(listItems)
print("")
def parseOBSW():
idSubsystemDefinitions = parseSubsystemDefinitionFile("../../config/tmtc/subsystemIdRanges.h")
tempList = parseSubsystemDefinitionFile("../../fsfw/events/fwSubsystemIdRanges.h")
idSubsystemDefinitions.update(tempList)
print(len(idSubsystemDefinitions))
# print ("Dictionary size is " + str(len(idInterfaceDefinitions)) )
# for entry in idSubsystemDefinitions:
# print(entry)
myHeaderList = getHeaderFileList("../../mission/")
myHeaderList = myHeaderList + getHeaderFileList("../../fsfw/")
myEventList = parseHeaderFiles(idSubsystemDefinitions, myHeaderList)
listItems = sorted(myEventList.items())
print("Found " + str(len(listItems)) + " entries:")
pp.pprint(listItems)
print(len(listItems))
return listItems
def handleFileExport(listItems):
csvWriter = CsvWriter(csvFilename)
if generateCpp:
print("Generating translation cpp file.")
writeTranslationFile(cppFilename, listItems)
if generateCpp and copyCppFile:
dst = shutil.copy2("../events/translateEvents.cpp", "../../config/events/")
print("CPP file was copied to " + dst)
if generateCsv:
print("Generating text export.")
exportToFile(csvFilename, listItems)
if generateCsv and moveCsvFile:
csvWriter.move_csv(csvMoveDestination)
# The output files are generated by putting the name of the output CPP file as first argument and the name of the
# csv or txt output as second argument in the Run Configuration.
# Config Parameters: translateEvents.cpp translateEvents.csv
def parseSubsystemDefinitionFile(filename):
file = open(filename, "r")
interfaces = dict()
for line in file.readlines():
match = re.search('([A-Z0-9_]*) = ([0-9]{1,2})', line)
if match:
interfaces.update({match.group(1): [match.group(2)]})
return interfaces
def returnNumberFromString(aString):
if aString.startswith('0x'):
return int(aString, 16)
elif aString.isdigit():
return int(aString)
else:
print('Error: Illegal number representation: ' + aString)
return 0
def convert(name):
singleStrings = name.split('_')
newString = ''
for oneString in singleStrings:
oneString = oneString.lower()
oneString = oneString.capitalize()
newString = newString + oneString
return newString
def buildCheckedString(firstPart, secondPart):
myStr = firstPart + convert(secondPart)
if len(myStr) > 16:
print("Error: Entry: " + myStr + " too long. Will truncate.")
myStr = myStr[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return myStr
def cleanUpDescription(description):
description = description.lstrip('//!<>')
description = description.lstrip()
if description == '':
description = ' '
return description
def parseHeaderFiles(interfaceList, fileList):
dictionnary = dict()
totalCount = 0
count = 0
# noinspection PyUnusedLocal
currentId = 0
for fileName in fileList:
file = open(fileName, "r")
oldline = file.readline()
myId = 0
# print(file_name)
while True:
newline = file.readline()
if not newline:
break # EOF
if not oldline == '\n':
twolines = oldline + ' ' + newline.strip()
else:
twolines = ''
match1 = re.search('SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);', twolines)
if match1:
currentId = interfaceList[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
myId = returnNumberFromString(currentId)
match = re.search('(//)?[\t ]*static const Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*MAKE_EVENT\(([0-9]{1,2}),'
'[\s]*SEVERITY::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)', twolines)
if match:
if match.group(1):
oldline = newline
continue
description = " "
if match.group(6):
description = cleanUpDescription(match.group(6))
stringToAdd = match.group(2)
fullId = (myId * 100) + returnNumberFromString(match.group(3))
severity = match.group(4)
if fullId in dictionnary:
print('duplicate Event ' + hex(fullId) + ' from ' + fileName + ' was already in ' +
dictionnary[fullId][3])
dictionnary.update({fullId: (stringToAdd, severity, description, fileName)})
count = count + 1
oldline = newline
if count > 0:
print("File " + fileName + " contained " + str(count) + " events.")
totalCount += count
count = 0
print("Total events: " + str(totalCount))
return dictionnary
def getHeaderFileList(base):
# print("getHeaderFileList called with" + base)
baseList = os.listdir(base)
fileList = []
for entry in baseList:
# Remove all hidden files:
if os.path.isdir(base + entry) and (entry[0] != ".") and (entry[0] != "_"):
fileList = fileList + getHeaderFileList(base + entry + "/")
if re.match("[^.]*\.h", entry) and os.path.isfile(base + entry):
fileList.append(base + entry)
return fileList
def exportToFile(filename, listOfEntries):
print('Exporting to file: ' + filename)
file = open(filename, "w")
for entry in listOfEntries:
file.write(str(entry[0]) + fileSeparator + entry[1][0] + fileSeparator + entry[1][1]
+ fileSeparator + entry[1][2] + fileSeparator + entry[1][3] + '\n')
file.close()
return
def writeTranslationFile(filename, listOfEntries):
outputfile = open(filename, "w")
definitions = ""
function = "const char * translateEvents(Event event){\n\tswitch((event&0xFFFF)){\n"
for entry in listOfEntries:
definitions += "const char *" + entry[1][0] + "_STRING = \"" + entry[1][0] + "\";\n"
function += "\t\tcase " + str(entry[0]) + ":\n\t\t\treturn " + entry[1][0] + "_STRING;\n"
function += '\t\tdefault:\n\t\t\treturn "UNKNOWN_EVENT";\n'
outputfile.write("/* Auto-generated event translation file. Contains "
+ str(len(listOfEntries)) + " translations. */\n")
outputfile.write("#include \"translateEvents.h\"\n\n")
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
outputfile.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,269 @@
#! /usr/bin/python3.7
"""
@file
MIB_Returnvalues.py
@brief
Part of the Mission Information Base Exporter for the SOURCE project by KSat.
TODO: Integrate into Parser Structure instead of calling this file (no cpp file generated yet)
@details
Returnvalue exporter.
To use MySQLdb, run pip install mysqlclient or install in IDE.
On Windows, Build Tools installation might be necessary.
@data
21.11.2019
"""
import pprint
import re
import os
import getpass
# import MySQLdb
from utility.mib_csv_writer import CsvWriter
doExportToFile = True
moveCsvFile = True
csvFilename = "MIB_Returnvalues.csv"
csvMoveDestination = "../"
fileSeparator = ';'
maxStringLength = 25
pp = pprint.PrettyPrinter(indent=0, width=250)
def main():
print("Parsing Returnvalues: ")
parseOBSW()
handleFileExport()
print("")
def parseOBSW():
idInterfaceDefinitions = \
parseInterfaceDefinitionFile("../../fsfw/returnvalues/FwClassIds.h",
"../../config/returnvalues/classIds.h")
print("Found interface definitions: ")
pp.pprint(idInterfaceDefinitions)
myHeaderList = getHeaderFileList("../../mission/")
myHeaderList = myHeaderList + getHeaderFileList("../../fsfw/")
myHeaderList = myHeaderList + getHeaderFileList("../../config/")
myHeaderList = myHeaderList + getHeaderFileList("../../sam9g20/")
mySecondList = parseHeaderFiles(idInterfaceDefinitions, myHeaderList)
print(len(myHeaderList))
print(len(mySecondList))
# print(mySecondList[14081])
# print (mySecondList.items()[0][1])
# print( "Found entries:" )
counter = 0
# for entry in sorted(mySecondList):
# print(entry)
# noinspection PyUnusedLocal
for entry in mySecondList.items():
counter = counter + 1
# print( entry[0], entry[1][0], entry[1][1] )
print("Count: ", counter)
if doExportToFile:
exportToFile(csvFilename, mySecondList)
else:
print('No export to file requested.')
# writeEntriesToOtherDB( mySecondList )
# writeEntriesToDB( mySecondList )
def handleFileExport():
csvWriter = CsvWriter(csvFilename)
if moveCsvFile:
csvWriter.move_csv(csvMoveDestination)
def parseInterfaceDefinitionFile(fwFilename, missionFilename):
file = open(fwFilename, "r")
interfaces = dict()
allLines = file.readlines()
count = 0
countMatched = False
# Parses first entry, which has explicit value 1
for line in allLines:
if not countMatched:
match = re.search('[\s]*([A-Z_0-9]*) = ([0-9]*),[\s]*//([A-Z]{1,3})', line)
else:
match = re.search('[\s]*([A-Z_0-9]*),[\s]*//([A-Z]{1,3})', line)
if match and not countMatched:
count = int(match.group(2))
interfaces.update({match.group(1): [1, match.group(3)]})
count += 1
countMatched = True
elif match:
interfaces.update({match.group(1): [count, match.group(2)]})
count += 1
file = open(missionFilename, "r")
allLines = file.readlines()
for line in allLines:
match = re.search('[\s]*([A-Z_0-9]*) = FW_CLASS_ID_COUNT,[\s]*(//([A-Z]{1,3}))?', line)
if match:
interfaces.update({match.group(1): [count, match.group(2)]})
count += 1
for line in allLines:
match = re.search('^[\s]*([A-Z_0-9]*)[,]*[\s]*//[!<]*[\s]*([^\n]*)', line)
if match:
interfaces.update({match.group(1): [count, match.group(2)]})
count += 1
print("Found interfaces : " + str(count - 1))
return interfaces
def returnNumberFromString(aString):
if aString.startswith('0x'):
return int(aString, 16)
elif aString.isdigit():
return int(aString)
else:
print('Error: Illegeal number representation: ' + aString)
return 0
def convert(name):
singleStrings = name.split('_')
newString = ''
for oneString in singleStrings:
oneString = oneString.lower()
oneString = oneString.capitalize()
newString = newString + oneString
return newString
def buildCheckedString(firstPart, secondPart):
myStr = firstPart + convert(secondPart)
if len(myStr) > maxStringLength:
print("Error: Entry: " + myStr + " too long. Will truncate.")
myStr = myStr[0:maxStringLength]
else:
# print("Entry: " + myStr + " is all right.")
pass
return myStr
def cleanUpDescription(descrString):
description = descrString.lstrip('!<- ')
if description == '':
description = ' '
return description
def parseHeaderFiles(interfaceList, fileList):
dictionnary = dict()
count = 0
currentName = ""
myId = 0
dictionnary.update({0: ('OK', 'System-wide code for ok.', 'RETURN_OK', 'HasReturnvaluesIF.h',
'HasReturnvaluesIF')})
dictionnary.update({1: ('Failed', 'Unspecified system-wide code for failed.',
'RETURN_FAILED', 'HasReturnvaluesIF.h', 'HasReturnvaluesIF')})
print('')
print("Parsing files: ")
for fileName in fileList:
# print("Parsing file " + fileName + ": ")
file = open(fileName, "r")
oldline = file.readline()
while True:
currentFullName = ""
newline = file.readline()
if not newline:
break # EOF
if not oldline == '\n':
twoLines = oldline + ' ' + newline.strip()
else:
twoLines = ''
match1 = re.search('INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)', twoLines)
if match1:
# print("Interface ID" + str(match1.group(1)) + "found in " + fileName)
currentId = interfaceList[match1.group(1)][0]
currentName = interfaceList[match1.group(1)][1]
currentFullName = match1.group(1)
# print( "Current ID: " + str(currentId) )
myId = currentId
match = re.search('^[\s]*static const ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*'
'MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)',
twoLines)
if match:
# valueTable.append([])
description = cleanUpDescription(match.group(4))
stringToAdd = buildCheckedString(currentName, match.group(1))
fullId = (myId << 8) + returnNumberFromString(match.group(2))
if fullId in dictionnary:
print('duplicate returncode ' + hex(fullId) + ' from ' + fileName + ' was already in ' +
dictionnary[fullId][3])
dictionnary.update({fullId: (stringToAdd, description, match.group(1), fileName, currentFullName)})
# valueTable[count].append(fullId)
# valueTable[count].append(stringToAdd)
count = count + 1
else:
pass
oldline = newline
# valueTable.pop()
return dictionnary
def getHeaderFileList(base):
# print ( "getHeaderFileList called with" + base )
baseList = os.listdir(base)
fileList = []
for entry in baseList:
# Remove all hidden files:
if os.path.isdir(base + entry) and (entry[0] != ".") and (entry[0] != "_"):
fileList = fileList + getHeaderFileList(base + entry + "/")
if re.match("[^.]*\.h", entry) and os.path.isfile(base + entry):
fileList.append(base + entry)
return fileList
def writeEntriesToDB(listOfEntries):
print("Connecting to database...")
user = getpass.getpass("User: ")
passwd = getpass.getpass()
conn = MySQLdb.connect(host="127.0.0.1", user=user, passwd=passwd, db="flpmib")
written = conn.cursor()
print("done.")
# delete old entries
print("Kill old entries.")
written.execute("DELETE FROM txp WHERE TXP_NUMBR = 'DSX00000'")
print("Insert new ones:")
for entry in listOfEntries.items():
written.execute("INSERT INTO txp (txp_numbr, txp_from, txp_to, txp_altxt) VALUES ('DSX00000', %s, %s, %s)",
[entry[0], entry[0], entry[1][0]])
conn.commit()
print("Done. That was easy.")
def writeEntriesToOtherDB(listOfEntries):
print("Connecting to other database...")
conn = MySQLdb.connect(host="buggy.irs.uni-stuttgart.de",
user='returncode', passwd='returncode', db="returncode")
written = conn.cursor()
print("connected.")
# delete old entries
print("Kill old entries.")
written.execute("DELETE FROM returncodes WHERE true")
print("Insert new ones:")
for entry in listOfEntries.items():
written.execute("INSERT INTO returncodes (code,name,interface,file,description) VALUES (%s, %s, %s, %s, %s)",
[entry[0], entry[1][2], entry[1][4], entry[1][3], entry[1][1]])
conn.commit()
print("Done. That was hard.")
def exportToFile(filename_, listOfEntries):
print('Exporting to file: ' + csvFilename)
file = open(filename_, "w")
for entry in listOfEntries.items():
file.write(hex(entry[0]) + fileSeparator + entry[1][0] + fileSeparator + entry[1][1] +
fileSeparator + entry[1][2] + fileSeparator
+ entry[1][3] + fileSeparator + entry[1][4] + '\n')
file.close()
return
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,71 @@
/**
* @file dataPoolInit.cpp
*
* @brief Auto-Generated datapool initialization
* @date 02.05.2020
*/
#include <config/cdatapool/dataPoolInit.h>
void datapool::dataPoolInit(poolMap * poolMap) {
/* FSFW */
poolMap->emplace(datapool::INTERNAL_ERROR_STORE_FULL,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::INTERNAL_ERROR_MISSED_LIVE_TM,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::INTERNAL_ERROR_FULL_MSG_QUEUES,
new PoolEntry<uint32_t>({0},1));
/* GPS 0 */
poolMap->emplace(datapool::GPS0_FIX_MODE,
new PoolEntry<uint8_t>({0},1));
poolMap->emplace(datapool::GPS0_NUMBER_OF_SV_IN_FIX,
new PoolEntry<uint8_t>({0},1));
poolMap->emplace(datapool::GPS0_GNSS_WEEK,
new PoolEntry<uint16_t>({0},1));
poolMap->emplace(datapool::GPS0_TIME_OF_WEEK,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS0_LATITUDE,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS0_LONGITUDE,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS0_MEAN_SEA_ALTITUDE,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS0_POSITION,
new PoolEntry<double>({0, 0, 0},3));
poolMap->emplace(datapool::GPS0_VELOCITY,
new PoolEntry<double>({0, 0, 0},3));
/* GPS 1 */
poolMap->emplace(datapool::GPS1_FIX_MODE,
new PoolEntry<uint8_t>({0},1));
poolMap->emplace(datapool::GPS1_NUMBER_OF_SV_IN_FIX,
new PoolEntry<uint8_t>({0},1));
poolMap->emplace(datapool::GPS1_GNSS_WEEK,
new PoolEntry<uint16_t>({0},1));
poolMap->emplace(datapool::GPS1_TIME_OF_WEEK,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS1_LATITUDE,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS1_LONGITUDE,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS1_MEAN_SEA_ALTITUDE,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::GPS1_POSITION,
new PoolEntry<double>({0, 0, 0},3));
poolMap->emplace(datapool::GPS1_VELOCITY,
new PoolEntry<double>({0, 0, 0},3));
/* TEST */
poolMap->emplace(datapool::TEST_BOOLEAN,
new PoolEntry<bool>({0},1));
poolMap->emplace(datapool::TEST_UINT8,
new PoolEntry<uint8_t>({0},1));
poolMap->emplace(datapool::TEST_UINT16,
new PoolEntry<uint16_t>({0},1));
poolMap->emplace(datapool::TEST_UINT32,
new PoolEntry<uint32_t>({0},1));
poolMap->emplace(datapool::TEST_FLOAT_VECTOR,
new PoolEntry<float>({0, 0},2));
}

View File

@ -0,0 +1,229 @@
#!/usr/bin/python3.8
"""
@file
mib_datapool_parser.py
@brief
Parses the global datapools and generates the corresponding source file optionally.
Python 3.8 required.
@details
Used by the MIB Exporter, inherits generic File Parser.
@author
R. Mueller
@date
03.01.2020
"""
import re
from enum import Enum
from datetime import date
from parserbase.mib_parser import FileParser
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import Printer
from utility.mib_file_management import copy_file
DATE_TODAY = date.today()
DATAPOOL_FILE = "../../config/dataPool/dataPoolInit.h"
DATAPOOL_CSV_NAME = "mib_datapool.csv"
CPP_FILE_NAME = "dataPoolInit.cpp"
WRITE_CSV_FILE = True
COPY_CSV_FILE = True
WRITE_CPP_FILE = True
COPY_CPP_FILE = False
CPP_COPY_DESTINATION = "../../config/dataPool/"
DATAPOOL_HEADER_COLUMNS = ["Pool ID", "Group", "Name", "Code Name", "Size", "Type", "Unit"]
class DatapoolColumns(Enum):
"""
Specifies order of MIB columns
"""
POOL_ID = 0
GROUP = 1
NAME = 2
CODE_NAME = 3
SIZE = 4
TYPE = 5
UNIT = 6
Clmns = DatapoolColumns
def main():
"""
This main is run if the datapool parser is to be run separately.
:return:
"""
file_list = [DATAPOOL_FILE]
print("DatapoolParser: Parsing datapool header file:")
print(file_list[0])
datapool_parser = DatapoolParser(file_list)
datapool_table = datapool_parser.parse_files()
Printer.print_content(datapool_table, "DatapoolParser: Printing datapool variable table:")
dh_command_writer = CsvWriter(DATAPOOL_CSV_NAME, datapool_table, DATAPOOL_HEADER_COLUMNS)
if WRITE_CSV_FILE:
dh_command_writer.write_to_csv()
if COPY_CSV_FILE:
dh_command_writer.move_csv("..")
if WRITE_CPP_FILE:
datapool_parser.write_data_pool_init_cpp()
print("DatapoolParser: C++ File was created.")
if COPY_CPP_FILE:
copy_file(CPP_FILE_NAME, CPP_COPY_DESTINATION)
print("DatapoolParser: Generated C++ file was copied to " + CPP_COPY_DESTINATION)
class DatapoolParser(FileParser):
"""
This parser reads the central datapool header file.
It can optionally generate the corresponding source file (C++11 needed).
Instantiate the class by supplying a number of files to parse and call the parse_files() method
:return:
"""
def __init__(self, file_list):
super().__init__(file_list)
# this table includes the current new table entry,
# which will be updated for target parameter
self.dict_entry_list = list(range(Clmns.__len__()))
self.cpp_file_information_list = []
self.parse_success = False
self.parse_for_datapool_entries = False
self.prev_group = ""
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r")
print("Parsing " + file_name + " ...")
linecount = 1
for line in file.readlines():
self.__handle_line_reading(line)
linecount = linecount + 1
def _post_parsing_operation(self):
if len(self.mib_table) > 0:
self.parse_success = True
def write_data_pool_init_cpp(self):
"""
Writes the data pool CPP source file if the parsing was successfull
:param copy_cpp:
:param cpp_file_name:
:param cpp_copy_destination:
:return:
"""
if not self.parse_success:
print("Datapool Parser: MIB Table is empty, no data to write CPP file")
return
cpp_file = open(CPP_FILE_NAME, "w")
current_date = DATE_TODAY.strftime("%d.%m.%Y")
header = "/**\n * @file\tdataPoolInit.cpp\n *\n * @brief\tAuto-Generated datapool " \
"initialization\n * @date\t" + current_date +\
"\n */\n#include <config/dataPool/dataPoolInit.h> " \
"\n\nvoid datapool::dataPoolInit(poolMap * poolMap) {\n"
cpp_file.write(header)
entries_len = self.index + 1
for index in range(1, entries_len):
self.__handle_entry_write(cpp_file, index)
tail = "\n}\n"
cpp_file.write(tail)
cpp_file.close()
def __handle_line_reading(self, line):
if self.parse_for_datapool_entries:
if not self.__scan_for_pool_variable(line):
self.__scan_for_export_string(line)
if not self.parse_for_datapool_entries:
datapool_start = re.search(r'[\s]*enum[\w_ ]*[\s]*{', line)
if datapool_start:
self.parse_for_datapool_entries = True
else:
self.__scan_for_datapool_end(line)
def __scan_for_export_string(self, line):
export_string_match = re.search(r'[/*!>< ]*\[EXPORT\][ :]*([^\n]*)', line, re.IGNORECASE)
if export_string_match:
self.__handle_export_string_match(export_string_match.group(1))
return export_string_match
def __handle_export_string_match(self, string):
group_match = re.search(r'\[GROUP\][\s]*([^\n\*]*)', string, re.IGNORECASE)
if group_match:
self.dict_entry_list[Clmns.GROUP.value] = group_match.group(1).rstrip()
def __scan_for_pool_variable(self, line):
pool_var_match = re.search(r'[\s]*([\w]*)[ =]*([\w]*)(?:,)?[\s]*([^\n]*)', line)
if pool_var_match:
if pool_var_match.group(1) == "":
return False
self.__handle_pool_var_match(pool_var_match)
return pool_var_match
def __handle_pool_var_match(self, pool_var_match):
if re.search(r'NO_PARAMETER', pool_var_match.group(0)):
return
self.dict_entry_list[Clmns.CODE_NAME.value] = pool_var_match.group(1)
self.dict_entry_list[Clmns.POOL_ID.value] = pool_var_match.group(2)
export_string_match = re.search(r'[/!< ]*\[EXPORT\][: ]*([^\n]*)',
pool_var_match.group(3), re.IGNORECASE)
if export_string_match:
self.__handle_pool_var_export_string(export_string_match.group(1))
datapool_tuple = tuple(self.dict_entry_list)
self.index = self.index + 1
self.mib_table.update({self.index: datapool_tuple})
self.dict_entry_list[Clmns.SIZE.value] = ""
self.dict_entry_list[Clmns.TYPE.value] = ""
self.dict_entry_list[Clmns.NAME.value] = ""
self.dict_entry_list[Clmns.UNIT.value] = ""
def __handle_pool_var_export_string(self, string):
extracted_entries = re.findall(r'(?:\[([\w]*)\][\s]*([^\[]*))?', string)
if extracted_entries:
extraced_entries_len = len(extracted_entries) - 1
for group_index in range(extraced_entries_len):
(group_name, group_content) = extracted_entries[group_index]
group_content = group_content.rstrip()
if group_name.casefold() == "name":
self.dict_entry_list[Clmns.NAME.value] = group_content
elif group_name.casefold() == "size":
self.dict_entry_list[Clmns.SIZE.value] = group_content
elif group_name.casefold() == "type":
self.dict_entry_list[Clmns.TYPE.value] = group_content
elif group_name.casefold() == "unit":
self.dict_entry_list[Clmns.UNIT.value] = group_content
def __scan_for_datapool_end(self, line):
datapool_end = re.search(r'}[\s]*;', line)
if datapool_end:
self.parse_for_datapool_entries = False
def __handle_entry_write(self, cpp_file, index):
current_mib_entry = self.mib_table.get(index)
(_, current_group, _, current_code_name,
current_size, current_type, _) = current_mib_entry
if current_group != self.prev_group:
cpp_file.write("\n\t/* " + current_group + " */\n")
self.prev_group = current_group
current_pool_entry_init_value = "{"
if current_size == "":
print("Size is unknown for a pool entry. Please specify in header file !")
return
current_size = int(current_size)
for count in range(current_size):
current_pool_entry_init_value = current_pool_entry_init_value + "0"
if count != current_size - 1:
current_pool_entry_init_value = current_pool_entry_init_value + ", "
current_pool_entry_init_value = current_pool_entry_init_value + "}"
entry_string = "\tpoolMap->emplace(datapool::" + current_code_name + \
",\n\t\t\tnew PoolEntry<" + current_type + ">(" + \
current_pool_entry_init_value + "," + str(current_size) + "));\n"
cpp_file.write(entry_string)
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,385 @@
#!/usr/bin/python3.7
"""
@file mib_device_command_parser.py
@brief Parses the device commands which are used for the PUS Service 8 as the primary means
of satellite commanding.
@details Used by the MIB Exporter, inherits generic File Parser.
Also has information parser which parses the possible device handler command values
from the actual device handlers.
@author R. Mueller
"""
import re
from enum import Enum
from parserbase.mib_file_list_parser import FileListParser
from parserbase.mib_parser import FileParser
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import Printer
DH_COMMAND_PACKET_DEFINITION_DESTINATION = "../../mission/devices/devicepackets/"
DH_DEFINITION_DESTINATION = "../../mission/devices/"
DH_COMMANDS_CSV_NAME = "mib_device_commands.csv"
DH_COMMAND_HEADER_COLUMNS = [
"Device Handler", "Command Name", "Action ID", "Command Field Name", "Command Field Position",
"Command Field Type", "Command Field Option Name", "Command Field Option Value", "Comment"]
SQL_DELETE_CMDTABLE_CMD = """
DROP TABLE IF EXISTS DeviceHandlerCommand;
"""
SQL_CREATE_CMDTABLE_CMD = """
CREATE TABLE IF NOT EXISTS DeviceHandlerCommand(
id INTEGER PRIMARY KEY,
deviceHandler TEXT,
commandName TEXT,
actionID INTEGER,
cmdFieldName TEXT,
cmdFieldPos INTEGER,
cmdFieldType TEXT,
cmdFieldOptName TEXT,
cmdFieldOptVal INTEGER,
comment COMMENT
)
"""
SQL_INSERT_INTO_CMDTABLE_CMD = """
INSERT INTO DeviceHandlerCommand(deviceHandler,commandName,actionID,cmdFieldName,cmdFieldPos,
cmdFieldType,cmdFieldOptName,cmdFieldOptVal,comment)
VALUES(?,?,?,?,?,?,?,?,?)
"""
class DeviceCommandColumns(Enum):
"""
Specifies order of MIB columns
"""
DH_NAME = 0
NAME = 1
ACTION_ID = 2
COMMAND_FIELD_NAME = 3
COMMAND_INDEX = 4
TYPE = 5
COMMAND_FIELD_OPTION_NAME = 6
COMMAND_FIELD_OPTION_VALUE = 7
COMMAND_FIELD_COMMENT = 8
Clmns = DeviceCommandColumns
def main():
"""
The main routine is run if the device command parser is run separately.
:return:
"""
info_header_file_parser = FileListParser(DH_DEFINITION_DESTINATION)
info_header_file_list = info_header_file_parser.\
parse_header_files(False, "Parsing device handler informations:")
dh_information_parser = DeviceHandlerInformationParser(info_header_file_list)
dh_information_table = dh_information_parser.parse_files()
Printer.print_content(dh_information_table, "Priting device handler command information table: ")
header_file_parser = FileListParser(DH_COMMAND_PACKET_DEFINITION_DESTINATION)
header_file_list = \
header_file_parser.parse_header_files(False, "Parsing device handler command files:")
packet_subservice_parser = DeviceHandlerCommandParser(header_file_list, dh_information_table)
dh_command_table = packet_subservice_parser.parse_files()
Printer.print_content(dh_command_table, "Printing device handler command table:")
dh_command_writer = CsvWriter(DH_COMMANDS_CSV_NAME, dh_command_table, DH_COMMAND_HEADER_COLUMNS)
dh_command_writer.write_to_csv()
dh_command_writer.copy_csv()
dh_command_writer.move_csv("..")
# pylint: disable=too-few-public-methods
class DeviceHandlerInformationParser(FileParser):
"""
This helper class parses device handler informations based on the device handler
header files. These can be used to map commands to the device handler packets later.
"""
def __init__(self, fileList):
super().__init__(fileList)
self.command_dict = dict()
self.command_enum_dict = dict()
self.command_enum_name = ""
self.command_value_name_list = []
self.command_value_list = []
self.command_comment_list = []
# this table includes the current new table entry, which will be updated
# for target parameter
self.command_scanning_pending = False
# This is called for every file. Fill out info table in this routine
def _handle_file_parsing(self, file_name, *args):
self_print_parsing_info = False
if len(args) == 1 and isinstance(args[0], bool):
self_print_parsing_info = args[0]
# Read device name from file name
handler_match = re.search(r'([\w]*).h', file_name)
if not handler_match:
print("Device Command Parser: Configuration error, no handler name match !")
handler_name = handler_match.group(1)
file = open(file_name, "r")
if self_print_parsing_info:
print("Parsing " + file_name + " ...")
# Scans each line for possible device handler command enums
for line in file.readlines():
self.__handle_line_reading(line)
handler_tuple = (self.command_dict, self.command_enum_dict)
handler_dict = dict()
handler_dict.update({handler_name: handler_tuple})
self.mib_table.update(handler_dict)
self.command_dict = dict()
self.command_enum_dict = dict()
def __handle_line_reading(self, line):
"""
Searches for enum command definitions or device command definitions.
:param line:
:return:
"""
# Case insensitive matching of device command enums
enum_match = re.search(r'[\s]*enum[\s]*([\w]*)[\s]*{[\s][/!<>]*[\s]*'
r'\[EXPORT[\w]*\][\s]*:[\s]*\[ENUM\]([^\n]*)', line, re.IGNORECASE)
if enum_match:
self.command_enum_name = enum_match.group(1)
self.command_scanning_pending = True
else:
self.__handle_command_definition_scanning(line)
# while command scanning is pending, each line in enum needs to be parsed
if self.command_scanning_pending:
self.__handle_command_enum_scanning(line)
def __handle_command_definition_scanning(self, line):
command_match = \
re.search(r'[\s]*static[\s]*const[\s]*DeviceCommandId_t[\s]*([\w]*)[\s]*=[\s]*'
r'([\w]*)[\s]*;[\s]*[/!<>]*[\s]*\[EXPORT\][\s]*:[\s]*\[COMMAND\]', line)
if command_match:
command_name = command_match.group(1)
command_id = command_match.group(2)
self.command_dict.update({command_name: command_id})
def __handle_command_enum_scanning(self, line):
self.__scan_command_entries(line)
if not self.command_scanning_pending:
# scanning enum finished
# stores current command into command dictionary with command name as unique key
command_tuple = self.command_value_name_list, self.command_value_list, \
self.command_comment_list
self.command_enum_dict.update({self.command_enum_name: command_tuple})
self.command_enum_name = ""
self.command_value_name_list = []
self.command_value_list = []
self.command_comment_list = []
def __scan_command_entries(self, line):
command_match = \
re.search(r'[\s]*([\w]*)[\s]*=[\s]*([0-9]{1,3})[^/][\s]*[/!<>]*[\s]*([^\n]*)', line)
if command_match:
self.command_value_name_list.append(command_match.group(1))
self.command_value_list.append(command_match.group(2))
self.command_comment_list.append(command_match.group(3))
elif re.search(r'}[\s]*;', line):
self.command_scanning_pending = False
def _post_parsing_operation(self):
pass
class PendingScanType(Enum):
"""
Specifies which scan type is performed in the device command parser.
"""
NO_SCANNING = 0
STRUCT_SCAN = 1
CLASS_SCAN = 2
# pylint: disable=too-many-instance-attributes
class DeviceHandlerCommandParser(FileParser):
"""
This is the actual device handler command parser. It will parse the device handler
packet definitions. A device handler info table must be passed which can be acquired
by running the DH information parser.
"""
def __init__(self, file_list, dh_information_table):
super().__init__(file_list)
# this table includes the current new table entry,
# which will be updated for target parameter
self.dict_entry_list = list(range(Clmns.__len__()))
# This table containts information about respective device handler command options
self.dh_information_table = dh_information_table
self.enum_dict = dict()
self.current_enum_name = ""
self.comment = ""
self.command_comment = ""
self.command_index = 0
self.scanning_pending = PendingScanType.NO_SCANNING.value
# This is called for every file, fill out mib_table
def _handle_file_parsing(self, file_name, *args):
self_print_parsing_info = False
if len(args) == 1 and isinstance(args[0], bool):
self_print_parsing_info = args[0]
file = open(file_name, "r")
if self_print_parsing_info:
print("Parsing " + file_name + " ...")
# Scans each line for possible device handler command enums
for line in file.readlines():
self.__handle_line_reading(line)
def __handle_line_reading(self, line: str):
"""
Search for struct command definition
:param line:
:return:
"""
self.__scan_for_commands(line)
# if self.struct_scanning_pending:
def __scan_for_commands(self, line):
# Search for struct command definition
struct_found = self.__scan_for_structs(line)
if not struct_found:
self.__scan_for_class(line)
if self.scanning_pending is not PendingScanType.NO_SCANNING.value:
self.__scan_command(line)
def __scan_for_structs(self, line):
struct_match = re.search(r'[\s]*struct[\s]*([\w]*)[\s]*{[\s]*[/!<>]*[\s]*'
r'\[EXPORT\][ :]*\[COMMAND\]'
r'[\s]*([\w]*)[ :]*([\w]*)', line)
if struct_match:
# Scan a found command struct
self.__start_class_or_struct_scanning(struct_match)
self.scanning_pending = PendingScanType.STRUCT_SCAN.value
return struct_match
def __scan_for_class(self, line):
# search for class command definition
class_match = re.search(r'[\s]*class[\s]*([\w]*)[\s]*[^{]*{[ /!<>]*\[EXPORT\][ :]*'
r'\[COMMAND\][\s]*([\w]*)[ :]*([\w]*)', line)
if class_match:
self.__start_class_or_struct_scanning(class_match)
self.scanning_pending = PendingScanType.CLASS_SCAN.value
def __start_class_or_struct_scanning(self, command_match):
"""
Stores and assigns values that are the same for each command field option
:param command_match:
:return:
"""
handler_name = command_match.group(2)
self.dict_entry_list[Clmns.DH_NAME.value] = handler_name
self.dict_entry_list[Clmns.NAME.value] = command_match.group(1)
command_name = command_match.group(3)
if handler_name in self.dh_information_table:
(command_id_dict, self.enum_dict) = self.dh_information_table[handler_name]
if command_name in command_id_dict:
self.dict_entry_list[Clmns.ACTION_ID.value] = command_id_dict[command_name]
def __scan_command(self, line):
datatype_match = False
if self.scanning_pending is PendingScanType.STRUCT_SCAN.value:
datatype_match = \
re.search(r'[\s]*(uint[0-9]{1,2}_t|float|double|bool|int|char)[\s]*([\w]*);'
r'(?:[\s]*[/!<>]*[\s]*\[EXPORT\][: ]*(.*))?', line)
elif self.scanning_pending is PendingScanType.CLASS_SCAN.value:
datatype_match = re.search(
r'[\s]*SerializeElement[\s]*<(uint[0-9]{1,2}_t|float|double|bool|int|char)[ >]*'
r'([\w]*);(?:[ /!<>]*\[EXPORT\][: ]*(.*))?', line)
if datatype_match:
self.__handle_datatype_match(datatype_match)
elif re.search(r'}[\s]*;', line):
self.scanning_pending = PendingScanType.NO_SCANNING.value
self.command_index = 0
def __handle_datatype_match(self, datatype_match):
self.dict_entry_list[Clmns.TYPE.value] = datatype_match.group(1)
self.dict_entry_list[Clmns.COMMAND_FIELD_NAME.value] = datatype_match.group(2)
size_of_enum = 0
if datatype_match.group(3) is not None:
self.__analyse_exporter_sequence(datatype_match.group(3))
if self.current_enum_name != "":
size_of_enum = self.__get_enum_size()
self.__update_device_command_dict(size_of_enum)
def __analyse_exporter_sequence(self, exporter_sequence):
# This matches the exporter sequence pairs e.g. [ENUM] BLA [COMMENT] BLABLA [...] ...
export_string_matches = re.search(r'(?:\[([\w]*)\][\s]*([^\[]*))?', exporter_sequence)
if export_string_matches:
if len(export_string_matches.groups()) % 2 != 0:
print("Device Command Parser: Error when analysing exporter sequence,"
" check exporter string format")
else:
count = 0
while count < len(export_string_matches.groups()):
sequence_type = export_string_matches.group(count + 1)
sequence_entry = export_string_matches.group(count + 2)
count = count + 2
self.__handle_sequence_pair(sequence_type, sequence_entry)
def __handle_sequence_pair(self, sequence_type, sequence_entry):
if sequence_type.casefold() == "enum":
self.current_enum_name = sequence_entry
elif sequence_type.casefold() == "comment":
self.command_comment = sequence_entry
def __get_enum_size(self) -> int:
if self.current_enum_name in self.enum_dict:
size_of_enum = len(self.enum_dict[self.current_enum_name][1])
return size_of_enum
return 0
def __update_device_command_dict(self, size_of_enum: int = 0):
if size_of_enum > 0:
enum_tuple = self.enum_dict[self.current_enum_name]
for count in range(0, size_of_enum):
self.__update_table_with_command_options(count, enum_tuple)
self.command_index = \
self.command_index + 1
else:
self.__update_table_with_no_command_options()
self.index = self.index + 1
self.current_enum_name = ""
def __update_table_with_command_options(self, count, enum_tuple):
enum_value_name_list, enum_value_list, enum_comment_list = enum_tuple
self.dict_entry_list[Clmns.COMMAND_FIELD_OPTION_NAME.value] = \
enum_value_name_list[count]
self.dict_entry_list[Clmns.COMMAND_FIELD_OPTION_VALUE.value] = enum_value_list[count]
self.dict_entry_list[Clmns.COMMAND_FIELD_COMMENT.value] = enum_comment_list[count]
self.dict_entry_list[Clmns.COMMAND_INDEX.value] = \
self.command_index
dh_command_tuple = tuple(self.dict_entry_list)
self.index += 1
self.mib_table.update({self.index: dh_command_tuple})
def __update_table_with_no_command_options(self):
self.dict_entry_list[Clmns.COMMAND_FIELD_OPTION_NAME.value] = ""
self.dict_entry_list[Clmns.COMMAND_FIELD_OPTION_VALUE.value] = ""
self.dict_entry_list[Clmns.COMMAND_FIELD_COMMENT.value] = self.command_comment
self.dict_entry_list[Clmns.COMMAND_INDEX.value] = \
self.command_index
dh_command_tuple = tuple(self.dict_entry_list)
self.mib_table.update({self.index: dh_command_tuple})
self.command_index += 1
def _post_parsing_operation(self):
pass
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,223 @@
#! /usr/bin/python3.8
"""
@file mib_events.py
@brief Part of the Mission Information Base Exporter for the SOURCE project by KSat.
@details
Event exporter.
To use MySQLdb, run pip install mysqlclient or install in IDE.
On Windows, Build Tools installation might be necessary
@data 21.11.2019
"""
import re
import datetime
from parserbase.mib_file_list_parser import FileListParser
from parserbase.mib_parser import FileParser
from utility.mib_printer import PrettyPrinter
from utility.mib_file_management import copy_file, move_file
DATE_TODAY = datetime.datetime.now()
DATE_STRING_FULL = DATE_TODAY.strftime("%Y-%m-%d %H:%M:%S")
GENERATE_CPP = True
GENERATE_CSV = True
COPY_CPP_FILE = True
MOVE_CSV_FILE = True
CSV_FILENAME = "mib_events.csv"
CSV_MOVE_DESTINATION = "../"
CPP_FILENAME = "translateEvents.cpp"
CPP_COPY_DESTINATION = "../../config/events/"
FILE_SEPARATOR = ";"
SUBSYSTEM_DEFINITION_DESTINATIONS = ["../../config/tmtc/subsystemIdRanges.h",
"../../fsfw/events/fwSubsystemIdRanges.h"]
HEADER_DEFINITION_DESTINATIONS = ["../../mission/", "../../fsfw/"]
def main():
print("EventParser: Parsing events: ")
event_list = parse_events()
if GENERATE_CSV:
handle_csv_export(CSV_FILENAME, event_list)
if MOVE_CSV_FILE:
move_file(CSV_FILENAME, CSV_MOVE_DESTINATION)
if GENERATE_CPP:
handle_cpp_export(CPP_FILENAME, event_list)
if COPY_CPP_FILE:
print("EventParser: Copying file to " + CPP_COPY_DESTINATION)
copy_file(CPP_FILENAME, CPP_COPY_DESTINATION)
print("")
def parse_events():
subsystem_parser = SubsystemDefinitionParser(SUBSYSTEM_DEFINITION_DESTINATIONS)
subsystem_table = subsystem_parser.parse_files()
print("Found " + str(len(subsystem_table)) + " subsystem definitions.")
PrettyPrinter.pprint(subsystem_table)
event_header_parser = FileListParser(HEADER_DEFINITION_DESTINATIONS)
event_headers = event_header_parser.parse_header_files(
True, "Parsing event header file list:\n", True)
# g.PP.pprint(event_headers)
# myEventList = parseHeaderFiles(subsystem_table, event_headers)
event_parser = EventParser(event_headers, subsystem_table)
event_table = event_parser.parse_files()
list_items = sorted(event_table.items())
print("Found " + str(len(list_items)) + " entries:")
PrettyPrinter.pprint(list_items)
return list_items
class SubsystemDefinitionParser(FileParser):
def __init__(self, file_list):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r")
for line in file.readlines():
match = re.search(r'([A-Z0-9_]*) = ([0-9]{1,2})', line)
if match:
self.mib_table.update({match.group(1): [match.group(2)]})
def _post_parsing_operation(self):
pass
class EventParser(FileParser):
def __init__(self, file_list, interface_list):
super().__init__(file_list)
self.interfaces = interface_list
self.count = 0
self.myId = 0
self.currentId = 0
self.last_lines = ["", "", ""]
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
try:
file = open(file_name, 'r', encoding='utf-8')
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, 'r', encoding='cp1252')
all_lines = file.readlines()
total_count = 0
for line in all_lines:
self.__handle_line_reading(line, file_name)
if self.count > 0:
print("File " + file_name + " contained " + str(self.count) + " events.")
total_count += self.count
self.count = 0
def _post_parsing_operation(self):
pass
def __handle_line_reading(self, line, file_name):
if not self.last_lines[0] == '\n':
twolines = self.last_lines[0] + ' ' + line.strip()
else:
twolines = ''
match1 = re.search('SUBSYSTEM_ID[\s]*=[\s]*SUBSYSTEM_ID::([A-Z_0-9]*);', twolines)
if match1:
self.currentId = self.interfaces[match1.group(1)][0]
# print( "Current ID: " + str(currentId) )
self.myId = self.return_number_from_string(self.currentId)
match = re.search(
'(//)?[\t ]*static const(?:expr)? Event[\s]*([A-Z_0-9]*)[\s]*=[\s]*'
'MAKE_EVENT\(([0-9]{1,2}),[\s]*SEVERITY::([A-Z]*)\);[\t ]*(//!<)?([^\n]*)', twolines)
if match:
if match.group(1):
self.last_lines[0] = line
return
description = " "
if match.group(6):
description = self.clean_up_description(match.group(6))
string_to_add = match.group(2)
full_id = (self.myId * 100) + self.return_number_from_string(match.group(3))
severity = match.group(4)
if full_id in self.mib_table:
# print("EventParser: Duplicate Event " + hex(full_id) + " from " + file_name +
# " was already in " + self.mib_table[full_id][3])
pass
self.mib_table.update({full_id: (string_to_add, severity, description, file_name)})
self.count = self.count + 1
self.last_lines[0] = line
def build_checked_string(self, first_part, second_part):
my_str = first_part + self.convert(second_part)
if len(my_str) > 16:
print("EventParser: Entry: " + my_str + " too long. Will truncate.")
my_str = my_str[0:14]
# else:
# print( "Entry: " + myStr + " is all right.")
return my_str
@staticmethod
def return_number_from_string(a_string):
if a_string.startswith('0x'):
return int(a_string, 16)
elif a_string.isdigit():
return int(a_string)
else:
print('EventParser: Illegal number representation: ' + a_string)
return 0
@staticmethod
def convert(name):
single_strings = name.split('_')
new_string = ''
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(description):
description = description.lstrip('//!<>')
description = description.lstrip()
if description == '':
description = ' '
return description
def export_to_file(filename, list_of_entries):
print("EventParser: Exporting to file: " + filename)
file = open(filename, "w")
for entry in list_of_entries:
file.write(str(entry[0]) + FILE_SEPARATOR + entry[1][0] + FILE_SEPARATOR + entry[1][1]
+ FILE_SEPARATOR + entry[1][2] + FILE_SEPARATOR + entry[1][3] + '\n')
file.close()
return
def write_translation_file(filename, list_of_entries):
outputfile = open(filename, "w")
definitions = ""
function = "const char * translateEvents(Event event){\n\tswitch((event&0xFFFF)){\n"
for entry in list_of_entries:
definitions += "const char *" + entry[1][0] + "_STRING = \"" + entry[1][0] + "\";\n"
function += "\t\tcase " + str(entry[0]) + ":\n\t\t\treturn " + entry[1][0] + "_STRING;\n"
function += '\t\tdefault:\n\t\t\treturn "UNKNOWN_EVENT";\n'
outputfile.write("/**\n * @brief Auto-generated event translation file. "
"Contains " + str(len(list_of_entries)) + " translations.\n"
" * Generated on: " + DATE_STRING_FULL +
" \n */\n")
outputfile.write("#include \"translateEvents.h\"\n\n")
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
outputfile.close()
def handle_csv_export(file_name: str, list_items: list):
"""
Generates the CSV in the same directory as the .py file and copes the CSV to another
directory if specified.
"""
export_to_file(file_name, list_items)
def handle_cpp_export(file_name: str, list_items):
print("EventParser: Generating translation cpp file.")
write_translation_file(file_name, list_items)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,248 @@
/**
* @brief Auto-generated event translation file. Contains 78 translations.
* Generated on: 2020-09-30 15:17:26
*/
#include "translateEvents.h"
const char *STORE_SEND_WRITE_FAILED_STRING = "STORE_SEND_WRITE_FAILED";
const char *STORE_WRITE_FAILED_STRING = "STORE_WRITE_FAILED";
const char *STORE_SEND_READ_FAILED_STRING = "STORE_SEND_READ_FAILED";
const char *STORE_READ_FAILED_STRING = "STORE_READ_FAILED";
const char *UNEXPECTED_MSG_STRING = "UNEXPECTED_MSG";
const char *STORING_FAILED_STRING = "STORING_FAILED";
const char *TM_DUMP_FAILED_STRING = "TM_DUMP_FAILED";
const char *STORE_INIT_FAILED_STRING = "STORE_INIT_FAILED";
const char *STORE_INIT_EMPTY_STRING = "STORE_INIT_EMPTY";
const char *STORE_CONTENT_CORRUPTED_STRING = "STORE_CONTENT_CORRUPTED";
const char *STORE_INITIALIZE_STRING = "STORE_INITIALIZE";
const char *INIT_DONE_STRING = "INIT_DONE";
const char *DUMP_FINISHED_STRING = "DUMP_FINISHED";
const char *DELETION_FINISHED_STRING = "DELETION_FINISHED";
const char *DELETION_FAILED_STRING = "DELETION_FAILED";
const char *AUTO_CATALOGS_SENDING_FAILED_STRING = "AUTO_CATALOGS_SENDING_FAILED";
const char *GET_DATA_FAILED_STRING = "GET_DATA_FAILED";
const char *STORE_DATA_FAILED_STRING = "STORE_DATA_FAILED";
const char *DEVICE_BUILDING_COMMAND_FAILED_STRING = "DEVICE_BUILDING_COMMAND_FAILED";
const char *DEVICE_SENDING_COMMAND_FAILED_STRING = "DEVICE_SENDING_COMMAND_FAILED";
const char *DEVICE_REQUESTING_REPLY_FAILED_STRING = "DEVICE_REQUESTING_REPLY_FAILED";
const char *DEVICE_READING_REPLY_FAILED_STRING = "DEVICE_READING_REPLY_FAILED";
const char *DEVICE_INTERPRETING_REPLY_FAILED_STRING = "DEVICE_INTERPRETING_REPLY_FAILED";
const char *DEVICE_MISSED_REPLY_STRING = "DEVICE_MISSED_REPLY";
const char *DEVICE_UNKNOWN_REPLY_STRING = "DEVICE_UNKNOWN_REPLY";
const char *DEVICE_UNREQUESTED_REPLY_STRING = "DEVICE_UNREQUESTED_REPLY";
const char *INVALID_DEVICE_COMMAND_STRING = "INVALID_DEVICE_COMMAND";
const char *MONITORING_LIMIT_EXCEEDED_STRING = "MONITORING_LIMIT_EXCEEDED";
const char *MONITORING_AMBIGUOUS_STRING = "MONITORING_AMBIGUOUS";
const char *FUSE_CURRENT_HIGH_STRING = "FUSE_CURRENT_HIGH";
const char *FUSE_WENT_OFF_STRING = "FUSE_WENT_OFF";
const char *POWER_ABOVE_HIGH_LIMIT_STRING = "POWER_ABOVE_HIGH_LIMIT";
const char *POWER_BELOW_LOW_LIMIT_STRING = "POWER_BELOW_LOW_LIMIT";
const char *SWITCH_WENT_OFF_STRING = "SWITCH_WENT_OFF";
const char *HEATER_ON_STRING = "HEATER_ON";
const char *HEATER_OFF_STRING = "HEATER_OFF";
const char *HEATER_TIMEOUT_STRING = "HEATER_TIMEOUT";
const char *HEATER_STAYED_ON_STRING = "HEATER_STAYED_ON";
const char *HEATER_STAYED_OFF_STRING = "HEATER_STAYED_OFF";
const char *TEMP_SENSOR_HIGH_STRING = "TEMP_SENSOR_HIGH";
const char *TEMP_SENSOR_LOW_STRING = "TEMP_SENSOR_LOW";
const char *TEMP_SENSOR_GRADIENT_STRING = "TEMP_SENSOR_GRADIENT";
const char *COMPONENT_TEMP_LOW_STRING = "COMPONENT_TEMP_LOW";
const char *COMPONENT_TEMP_HIGH_STRING = "COMPONENT_TEMP_HIGH";
const char *COMPONENT_TEMP_OOL_LOW_STRING = "COMPONENT_TEMP_OOL_LOW";
const char *COMPONENT_TEMP_OOL_HIGH_STRING = "COMPONENT_TEMP_OOL_HIGH";
const char *TEMP_NOT_IN_OP_RANGE_STRING = "TEMP_NOT_IN_OP_RANGE";
const char *FDIR_CHANGED_STATE_STRING = "FDIR_CHANGED_STATE";
const char *FDIR_STARTS_RECOVERY_STRING = "FDIR_STARTS_RECOVERY";
const char *FDIR_TURNS_OFF_DEVICE_STRING = "FDIR_TURNS_OFF_DEVICE";
const char *MONITOR_CHANGED_STATE_STRING = "MONITOR_CHANGED_STATE";
const char *VALUE_BELOW_LOW_LIMIT_STRING = "VALUE_BELOW_LOW_LIMIT";
const char *VALUE_ABOVE_HIGH_LIMIT_STRING = "VALUE_ABOVE_HIGH_LIMIT";
const char *VALUE_OUT_OF_RANGE_STRING = "VALUE_OUT_OF_RANGE";
const char *SWITCHING_TM_FAILED_STRING = "SWITCHING_TM_FAILED";
const char *CHANGING_MODE_STRING = "CHANGING_MODE";
const char *MODE_INFO_STRING = "MODE_INFO";
const char *FALLBACK_FAILED_STRING = "FALLBACK_FAILED";
const char *MODE_TRANSITION_FAILED_STRING = "MODE_TRANSITION_FAILED";
const char *CANT_KEEP_MODE_STRING = "CANT_KEEP_MODE";
const char *OBJECT_IN_INVALID_MODE_STRING = "OBJECT_IN_INVALID_MODE";
const char *FORCING_MODE_STRING = "FORCING_MODE";
const char *MODE_CMD_REJECTED_STRING = "MODE_CMD_REJECTED";
const char *HEALTH_INFO_STRING = "HEALTH_INFO";
const char *CHILD_CHANGED_HEALTH_STRING = "CHILD_CHANGED_HEALTH";
const char *CHILD_PROBLEMS_STRING = "CHILD_PROBLEMS";
const char *OVERWRITING_HEALTH_STRING = "OVERWRITING_HEALTH";
const char *TRYING_RECOVERY_STRING = "TRYING_RECOVERY";
const char *RECOVERY_STEP_STRING = "RECOVERY_STEP";
const char *RECOVERY_DONE_STRING = "RECOVERY_DONE";
const char *RF_AVAILABLE_STRING = "RF_AVAILABLE";
const char *RF_LOST_STRING = "RF_LOST";
const char *BIT_LOCK_STRING = "BIT_LOCK";
const char *BIT_LOCK_LOST_STRING = "BIT_LOCK_LOST";
const char *FRAME_PROCESSING_FAILED_STRING = "FRAME_PROCESSING_FAILED";
const char *CLOCK_SET_STRING = "CLOCK_SET";
const char *CLOCK_SET_FAILURE_STRING = "CLOCK_SET_FAILURE";
const char *TEST_STRING = "TEST";
const char * translateEvents(Event event){
switch((event&0xFFFF)){
case 2200:
return STORE_SEND_WRITE_FAILED_STRING;
case 2201:
return STORE_WRITE_FAILED_STRING;
case 2202:
return STORE_SEND_READ_FAILED_STRING;
case 2203:
return STORE_READ_FAILED_STRING;
case 2204:
return UNEXPECTED_MSG_STRING;
case 2205:
return STORING_FAILED_STRING;
case 2206:
return TM_DUMP_FAILED_STRING;
case 2207:
return STORE_INIT_FAILED_STRING;
case 2208:
return STORE_INIT_EMPTY_STRING;
case 2209:
return STORE_CONTENT_CORRUPTED_STRING;
case 2210:
return STORE_INITIALIZE_STRING;
case 2211:
return INIT_DONE_STRING;
case 2212:
return DUMP_FINISHED_STRING;
case 2213:
return DELETION_FINISHED_STRING;
case 2214:
return DELETION_FAILED_STRING;
case 2215:
return AUTO_CATALOGS_SENDING_FAILED_STRING;
case 2600:
return GET_DATA_FAILED_STRING;
case 2601:
return STORE_DATA_FAILED_STRING;
case 2800:
return DEVICE_BUILDING_COMMAND_FAILED_STRING;
case 2801:
return DEVICE_SENDING_COMMAND_FAILED_STRING;
case 2802:
return DEVICE_REQUESTING_REPLY_FAILED_STRING;
case 2803:
return DEVICE_READING_REPLY_FAILED_STRING;
case 2804:
return DEVICE_INTERPRETING_REPLY_FAILED_STRING;
case 2805:
return DEVICE_MISSED_REPLY_STRING;
case 2806:
return DEVICE_UNKNOWN_REPLY_STRING;
case 2807:
return DEVICE_UNREQUESTED_REPLY_STRING;
case 2808:
return INVALID_DEVICE_COMMAND_STRING;
case 2809:
return MONITORING_LIMIT_EXCEEDED_STRING;
case 2810:
return MONITORING_AMBIGUOUS_STRING;
case 4201:
return FUSE_CURRENT_HIGH_STRING;
case 4202:
return FUSE_WENT_OFF_STRING;
case 4204:
return POWER_ABOVE_HIGH_LIMIT_STRING;
case 4205:
return POWER_BELOW_LOW_LIMIT_STRING;
case 4300:
return SWITCH_WENT_OFF_STRING;
case 5000:
return HEATER_ON_STRING;
case 5001:
return HEATER_OFF_STRING;
case 5002:
return HEATER_TIMEOUT_STRING;
case 5003:
return HEATER_STAYED_ON_STRING;
case 5004:
return HEATER_STAYED_OFF_STRING;
case 5200:
return TEMP_SENSOR_HIGH_STRING;
case 5201:
return TEMP_SENSOR_LOW_STRING;
case 5202:
return TEMP_SENSOR_GRADIENT_STRING;
case 5901:
return COMPONENT_TEMP_LOW_STRING;
case 5902:
return COMPONENT_TEMP_HIGH_STRING;
case 5903:
return COMPONENT_TEMP_OOL_LOW_STRING;
case 5904:
return COMPONENT_TEMP_OOL_HIGH_STRING;
case 5905:
return TEMP_NOT_IN_OP_RANGE_STRING;
case 7101:
return FDIR_CHANGED_STATE_STRING;
case 7102:
return FDIR_STARTS_RECOVERY_STRING;
case 7103:
return FDIR_TURNS_OFF_DEVICE_STRING;
case 7201:
return MONITOR_CHANGED_STATE_STRING;
case 7202:
return VALUE_BELOW_LOW_LIMIT_STRING;
case 7203:
return VALUE_ABOVE_HIGH_LIMIT_STRING;
case 7204:
return VALUE_OUT_OF_RANGE_STRING;
case 7301:
return SWITCHING_TM_FAILED_STRING;
case 7400:
return CHANGING_MODE_STRING;
case 7401:
return MODE_INFO_STRING;
case 7402:
return FALLBACK_FAILED_STRING;
case 7403:
return MODE_TRANSITION_FAILED_STRING;
case 7404:
return CANT_KEEP_MODE_STRING;
case 7405:
return OBJECT_IN_INVALID_MODE_STRING;
case 7406:
return FORCING_MODE_STRING;
case 7407:
return MODE_CMD_REJECTED_STRING;
case 7506:
return HEALTH_INFO_STRING;
case 7507:
return CHILD_CHANGED_HEALTH_STRING;
case 7508:
return CHILD_PROBLEMS_STRING;
case 7509:
return OVERWRITING_HEALTH_STRING;
case 7510:
return TRYING_RECOVERY_STRING;
case 7511:
return RECOVERY_STEP_STRING;
case 7512:
return RECOVERY_DONE_STRING;
case 7900:
return RF_AVAILABLE_STRING;
case 7901:
return RF_LOST_STRING;
case 7902:
return BIT_LOCK_STRING;
case 7903:
return BIT_LOCK_LOST_STRING;
case 7905:
return FRAME_PROCESSING_FAILED_STRING;
case 8900:
return CLOCK_SET_STRING;
case 8901:
return CLOCK_SET_FAILURE_STRING;
case 9700:
return TEST_STRING;
default:
return "UNKNOWN_EVENT";
}
return 0;
}

337
generators/mib_exporter.py Normal file
View File

@ -0,0 +1,337 @@
#! /usr/bin/python3.8
# -*- coding: utf-8 -*-
"""
@file mib_exporter.py
@brief Mission Information Base Exporter for the SOURCE project by KSat.
@details
Parses OBSW which is based on FSFW developed by the Institute of Space Systems (IRS) Stuttgart.
Python 3.8 required
This exporter generates the MIB from the SOURCE On-Board Software directly
by using file parser implementations
This exporter has the following capabilities :
1. Export MIB tables CSV files
2. Export MIB tables into a SQL database
This exporter currently has parser for following data:
1. Objects
2. Returnvalues
3. Packet content (Telemetry/Telecommands)
4. Events
5. Subservices
6. Device Commands
7. Global datapool
@developers
Basic Instructions to implement new parserbase:
This parser uses a generic parser class. A specific parser implementation
can be built by implementing the generic parser class.
The parser generally takes a list with all files to parse and a dictionary
with the structure of the MiB table.
This website can be used to experiment with regular expressions: https://regex101.com/
TODO:
1. Maybe make this file object oriented too.
"""
import os
import pprint
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import Printer, PrettyPrinter
from utility.mib_sql_writer import SqlWriter
from utility import mib_globals as g
from parserbase.mib_file_list_parser import FileListParser
from packetcontent.mib_packet_content_parser import (
PacketContentParser,
PACKET_CONTENT_DEFINITION_DESTINATION,
PACKET_CONTENT_CSV_NAME,
PACKET_CONTENT_HEADER_COLUMN,
SQL_CREATE_PACKET_DATA_CONTENT_CMD,
SQL_INSERT_PACKET_DATA_CMD,
SQL_DELETE_PACKET_DATA_CONTENT_CMD
)
from subservice.mib_subservice_parser import (
SubserviceParser,
SUBSERVICE_DEFINITION_DESTINATION,
SUBSERVICE_CSV_NAME,
SUBSERVICE_COLUMN_HEADER,
SQL_CREATE_SUBSVC_CMD,
SQL_DELETE_SUBSVC_CMD,
SQL_INSERT_INTO_SUBSVC_CMD,
)
from devicecommands.mib_device_command_parser import (
DeviceHandlerInformationParser,
DeviceHandlerCommandParser,
DH_COMMAND_PACKET_DEFINITION_DESTINATION,
DH_DEFINITION_DESTINATION,
DH_COMMANDS_CSV_NAME,
DH_COMMAND_HEADER_COLUMNS,
SQL_CREATE_CMDTABLE_CMD,
SQL_INSERT_INTO_CMDTABLE_CMD,
SQL_DELETE_CMDTABLE_CMD
)
from returnvalues.mib_returnvalues import (
InterfaceParser,
ReturnValueParser,
INTERFACE_DEFINITION_FILES,
RETURNVALUE_DESTINATIONS,
sql_retval_exporter,
CSV_RETVAL_FILENAME
)
from objects.mib_objects import (
ObjectDefinitionParser,
OBJECTS_DEFINITIONS,
export_object_file,
CSV_OBJECT_FILENAME,
sql_object_exporter
)
DO_EXPORT_MIB = True
PRINT_TABLES_TO_CONSOLE = False
EXPORT_TO_CSV = True
EXPORT_TO_SQL = True
COPY_FILE = False
COPY_DESTINATION = "."
FILE_SEPARATOR = ";"
EXECUTE_SQL_COMMANDS = False
def main():
"""
Performs MIB generation.
"""
parse_mib()
def parse_mib():
"""
This is the core function. It builds parses all files,
builds all tables and returns them in a tuple.
The structure of respective tables is generated in a
separate functions and is easily modifiable:
:return:
"""
handle_subservices_generation()
print()
# handle_packet_content_generation()
# print()
# handle_device_handler_command_generation()
# print()
handle_returnvalue_generation()
print()
handle_objects_generation()
print()
handle_events_generation()
print()
def handle_subservices_generation():
print("MIB Exporter: Parsing subservices")
subservice_table = generate_subservice_table()
print("MIB Exporter: Found " + str(len(subservice_table)) + " subservice entries.")
if PRINT_TABLES_TO_CONSOLE:
print("MIB Exporter: Printing subservice table: ")
Printer.print_content(subservice_table)
if EXPORT_TO_CSV:
subservice_writer = CsvWriter(
SUBSERVICE_CSV_NAME, subservice_table, SUBSERVICE_COLUMN_HEADER
)
print("MIB Exporter: Exporting to file: " + SUBSERVICE_CSV_NAME)
subservice_writer.write_to_csv()
if EXPORT_TO_SQL:
print("MIB Exporter: Exporting subservices to SQL")
sql_writer = SqlWriter()
sql_writer.delete(SQL_DELETE_SUBSVC_CMD)
sql_writer.sql_writing_helper(
SQL_CREATE_SUBSVC_CMD, SQL_INSERT_INTO_SUBSVC_CMD, subservice_table
)
def generate_subservice_table():
""" Generate the subservice table. """
subservice_header_parser = FileListParser(
destination_corrected(SUBSERVICE_DEFINITION_DESTINATION)
)
subservice_header_list = subservice_header_parser.parse_header_files(
False, "MIB Exporter: Parsing subservice files: "
)
subservice_file_parser = SubserviceParser(subservice_header_list)
subservice_table = subservice_file_parser.parse_files()
return subservice_table
def handle_packet_content_generation():
print("MIB Exporter: Parsing packing content")
packet_content_table = generate_packet_content_table()
print("MIB Exporter: Found " + str(len(packet_content_table)) + " packet content entries.")
if PRINT_TABLES_TO_CONSOLE:
print("MIB Exporter: Print packet content table: ")
Printer.print_content(packet_content_table)
if EXPORT_TO_CSV:
packet_content_writer = CsvWriter(
PACKET_CONTENT_CSV_NAME, packet_content_table, PACKET_CONTENT_HEADER_COLUMN
)
print("MIB Exporter: Exporting to file " + PACKET_CONTENT_CSV_NAME)
packet_content_writer.write_to_csv()
if EXPORT_TO_SQL:
print("MIB Exporter: Exporting packet content to SQL")
sql_writer = SqlWriter()
sql_writer.sql_writing_helper(
SQL_CREATE_PACKET_DATA_CONTENT_CMD,
SQL_INSERT_PACKET_DATA_CMD,
packet_content_table,
SQL_DELETE_PACKET_DATA_CONTENT_CMD
)
def generate_packet_content_table():
""" Generate packet content table """
packet_data_header_parser = FileListParser(
destination_corrected(PACKET_CONTENT_DEFINITION_DESTINATION)
)
packet_data_header_list = packet_data_header_parser.parse_header_files(
False, "MIB Exporter: Parsing packet data files: "
)
packet_content_file_parser = PacketContentParser(packet_data_header_list)
packet_content_table = packet_content_file_parser.parse_files()
return packet_content_table
def handle_device_handler_command_generation():
print("MIB Exporter: Parsing device handler commands.")
dh_command_table = generate_device_command_table()
print("MIB Exporter: Found " + str(len(dh_command_table)) + " device handler command entries")
if PRINT_TABLES_TO_CONSOLE:
print("MIB Exporter: Printing device handler command table: ")
Printer.print_content(dh_command_table)
if EXPORT_TO_CSV:
device_command_writer = CsvWriter(
DH_COMMANDS_CSV_NAME, dh_command_table, DH_COMMAND_HEADER_COLUMNS
)
print("MIB Exporter: Exporting device handler commands to " + DH_COMMANDS_CSV_NAME)
device_command_writer.write_to_csv()
if EXPORT_TO_SQL:
print("MIB Exporter: Exporting device handler commands to SQL")
sql_writer = SqlWriter()
sql_writer.sql_writing_helper(
SQL_CREATE_CMDTABLE_CMD, SQL_INSERT_INTO_CMDTABLE_CMD, dh_command_table,
SQL_DELETE_CMDTABLE_CMD
)
def generate_device_command_table(print_info_table: bool = False):
""" Generate device command table """
info_header_file_parser = FileListParser(
destination_corrected(DH_DEFINITION_DESTINATION)
)
info_header_file_list = info_header_file_parser.parse_header_files(
False, "MIB Exporter: Parsing device handler informations: "
)
dh_information_parser = DeviceHandlerInformationParser(info_header_file_list)
dh_information_table = dh_information_parser.parse_files()
print("MIB Exporter: Found " + str(len(dh_information_table)) +
" device handler information entries.")
if print_info_table:
Printer.print_content(
dh_information_table, "MIB Exporter: Priting device handler command information table: "
)
header_file_parser = FileListParser(
destination_corrected(DH_COMMAND_PACKET_DEFINITION_DESTINATION)
)
header_file_list = header_file_parser.parse_header_files(
False, "MIB Exporter: Parsing device handler command files: "
)
packet_subservice_parser = DeviceHandlerCommandParser(
header_file_list, dh_information_table
)
dh_command_table = packet_subservice_parser.parse_files()
return dh_command_table
def handle_returnvalue_generation():
print("MIB Exporter: Parsing returnvalues")
returnvalue_table = generate_returnvalue_table()
print("MIB Exporter: Found " + str(len(returnvalue_table)) + " returnvalues.")
if PRINT_TABLES_TO_CONSOLE:
print("MIB Exporter: Printing returnvalue table: ")
Printer.print_content(returnvalue_table)
if EXPORT_TO_CSV:
print("MIB Exporter: Exporting returnvalues to " + CSV_RETVAL_FILENAME)
ReturnValueParser.export_to_file(CSV_RETVAL_FILENAME, returnvalue_table)
if EXPORT_TO_SQL:
print("MIB Exporter: Export returnvalues to SQL: ")
sql_retval_exporter(returnvalue_table)
def generate_returnvalue_table():
interface_parser = InterfaceParser(
destination_corrected(INTERFACE_DEFINITION_FILES), False
)
interfaces = interface_parser.parse_files()
print("MIB Exporter: Found interfaces : " + str(len(interfaces)))
header_parser = FileListParser(destination_corrected(RETURNVALUE_DESTINATIONS))
header_list = header_parser.parse_header_files(True, "MIB Exporter: Parsing header file list: ")
returnvalue_parser = ReturnValueParser(interfaces, header_list, False)
returnvalue_table = returnvalue_parser.parse_files(False)
if PRINT_TABLES_TO_CONSOLE:
Printer.print_content(returnvalue_table, "Returnvalue Table: ")
return returnvalue_table
def handle_objects_generation():
print("MIB Exporter: Parsing Objects")
object_parser = ObjectDefinitionParser(destination_corrected(OBJECTS_DEFINITIONS))
object_table = object_parser.parse_files()
object_list_sorted = sorted(object_table.items())
print("MIB Exporter: Found " + str(len(object_table)) + " entries")
if EXPORT_TO_CSV:
print("MIB Exporter: Exporting to file: " + CSV_OBJECT_FILENAME)
export_object_file(CSV_OBJECT_FILENAME, object_list_sorted)
if EXPORT_TO_SQL:
print("MIB Exporter: Exporting objects into SQL table")
sql_object_exporter(object_list_sorted)
def handle_events_generation():
pass
def destination_corrected(destination_string):
"""
If headers are parsed here instead of the respective subdirectories,
the destination files are located in a different relative destination
"""
if isinstance(destination_string, list):
destination_list = []
for destination in destination_string:
destination_list.append(destination[3:])
return destination_list
return destination_string[3:]
def handle_external_file_running():
"""
Generates the MIB parser from external files
TODO: Make this stuff OOP too. Retvals and objects were already refactored
"""
os.chdir("events")
os.system("python mib_events.py")
os.chdir("..")
print_string = "Exported to file: MIB_Events.csv\r\n"
return print_string
def update_globals():
""" Updates the global variables """
g.PP = pprint.PrettyPrinter(indent=0, width=250)
g.doExportMIB = DO_EXPORT_MIB
g.executeSQLcommands = False
g.printToConsole = PRINT_TABLES_TO_CONSOLE
g.exportToCSV = EXPORT_TO_CSV
g.copyFile = COPY_FILE
g.copyDestination = COPY_DESTINATION
g.fileSeparator = FILE_SEPARATOR
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,150 @@
#! /usr/bin/python3.8
"""
@file mib_objects.py
@brief Part of the Mission Information Base Exporter for the SOURCE project by KSat.
@details
Object exporter.
To use MySQLdb, run pip install mysqlclient or install in IDE.
On Windows, Build Tools installation might be necessary
@data 21.11.2019
"""
import re
import datetime
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import PrettyPrinter
from utility.mib_file_management import copy_file
from parserbase.mib_parser import FileParser
from utility.mib_sql_writer import SqlWriter, SQL_DATABASE_NAME
DATE_TODAY = datetime.datetime.now()
DATE_STRING_FULL = DATE_TODAY.strftime("%Y-%m-%d %H:%M:%S")
GENERATE_CSV = True
MOVE_CSV = True
GENERATE_CPP = True
COPY_CPP = True
EXPORT_TO_SQL = True
CPP_COPY_DESTINATION = "../../config/objects/"
CSV_MOVE_DESTINATION = "../"
CPP_FILENAME = "translateObjects.cpp"
CSV_OBJECT_FILENAME = "mib_objects.csv"
FILE_SEPARATOR = ";"
SUBSYSTEM_DEFINITION_DESTINATION = "../../config/objects/systemObjectList.h"
FRAMEWORK_SUBSYSTEM_DEFINITION_DESTINATION = "../../fsfw/objectmanager/frameworkObjects.h"
OBJECTS_DEFINITIONS = [SUBSYSTEM_DEFINITION_DESTINATION, FRAMEWORK_SUBSYSTEM_DEFINITION_DESTINATION]
SQL_DELETE_OBJECTS_CMD = """
DROP TABLE IF EXISTS Objects
"""
SQL_CREATE_OBJECTS_CMD = """
CREATE TABLE IF NOT EXISTS Objects(
id INTEGER PRIMARY KEY,
objectid TEXT,
name TEXT
)
"""
SQL_INSERT_INTO_OBJECTS_CMD = """
INSERT INTO Objects(objectid, name)
VALUES(?,?)
"""
def main():
print("Parsing objects: ")
list_items = parse_objects()
handle_file_export(list_items)
if EXPORT_TO_SQL:
print("ObjectParser: Exporting to SQL")
sql_object_exporter(list_items, "../" + SQL_DATABASE_NAME)
def parse_objects():
# fetch objects
object_parser = ObjectDefinitionParser(OBJECTS_DEFINITIONS)
subsystem_definitions = object_parser.parse_files()
# id_subsystem_definitions.update(framework_subsystem_definitions)
list_items = sorted(subsystem_definitions.items())
PrettyPrinter.pprint(list_items)
print("ObjectParser: Number of objects: ", len(list_items))
return list_items
def handle_file_export(list_items):
csv_writer = CsvWriter(CSV_OBJECT_FILENAME)
if GENERATE_CPP:
print("ObjectParser: Generating translation C++ file.")
write_translation_file(CPP_FILENAME, list_items)
if COPY_CPP:
print("ObjectParser: Copying object file to " + CPP_COPY_DESTINATION)
copy_file(CPP_FILENAME, CPP_COPY_DESTINATION)
if GENERATE_CSV:
print("ObjectParser: Generating text export.")
export_object_file(CSV_OBJECT_FILENAME, list_items)
if MOVE_CSV:
csv_writer.move_csv(CSV_MOVE_DESTINATION)
class ObjectDefinitionParser(FileParser):
def __init__(self, file_list: list):
super().__init__(file_list)
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
file = open(file_name, "r", encoding="utf-8")
for line in file.readlines():
match = re.search('([\w]*)[\s]*=[\s]*(0[xX][0-9a-fA-F]+)', line)
if match:
self.mib_table.update({match.group(2): [match.group(1)]})
def _post_parsing_operation(self):
pass
def export_object_file(filename, object_list):
file = open(filename, "w")
for entry in object_list:
file.write(str(entry[0]) + FILE_SEPARATOR + entry[1][0] + '\n')
file.close()
def write_translation_file(filename, list_of_entries):
outputfile = open(filename, "w")
print('ObjectParser: Writing translation file ' + filename)
definitions = ""
function = "const char* translateObject(object_id_t object){\n\tswitch((object&0xFFFFFFFF)){\n"
for entry in list_of_entries:
# first part of translate file
definitions += "const char *" + entry[1][0] + "_STRING = \"" + entry[1][0] + "\";\n"
# second part of translate file. entry[i] contains 32 bit hexadecimal numbers
function += "\t\tcase " + str(entry[0]) + ":\n\t\t\treturn " + entry[1][0] + "_STRING;\n"
function += '\t\tdefault:\n\t\t\treturn "UNKNOWN_OBJECT";\n'
outputfile.write("/** \n * @brief\tAuto-generated object translation file. Contains "
+ str(len(list_of_entries)) + " translations. \n"
" * Generated on: " + DATE_STRING_FULL + "\n **/ \n")
outputfile.write("#include \"translateObjects.h\"\n\n")
outputfile.write(definitions + "\n" + function + "\t}\n\treturn 0;\n}\n")
outputfile.close()
def sql_object_exporter(object_table: list, sql_table: str = SQL_DATABASE_NAME):
sql_writer = SqlWriter(sql_table)
sql_writer.delete(SQL_DELETE_OBJECTS_CMD)
sql_writer.open(SQL_CREATE_OBJECTS_CMD)
for entry in object_table:
sql_writer.write_entries(
SQL_INSERT_INTO_OBJECTS_CMD, (entry[0], entry[1][0]))
sql_writer.commit()
sql_writer.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,116 @@
/**
* @brief Auto-generated object translation file. Contains 34 translations.
* Generated on: 2020-09-30 15:21:42
**/
#include "translateObjects.h"
const char *DUMMY1_STRING = "DUMMY1";
const char *DUMMY2_STRING = "DUMMY2";
const char *DUMMY3_STRING = "DUMMY3";
const char *DUMMY4_STRING = "DUMMY4";
const char *DUMMY5_STRING = "DUMMY5";
const char *DUMMY6_STRING = "DUMMY6";
const char *DUMMY7_STRING = "DUMMY7";
const char *DUMMY8_STRING = "DUMMY8";
const char *TIME_STAMPER_STRING = "TIME_STAMPER";
const char *SOFTWARE_STRING = "SOFTWARE";
const char *CCSDS_DISTRIBUTOR_STRING = "CCSDS_DISTRIBUTOR";
const char *PUS_DISTRIBUTOR_STRING = "PUS_DISTRIBUTOR";
const char *TEST_DEVICE_HANDLER_STRING = "TEST_DEVICE_HANDLER";
const char *TEST_ECHO_COM_IF_STRING = "TEST_ECHO_COM_IF";
const char *UDP_BRIDGE_STRING = "UDP_BRIDGE";
const char *UDP_POLLING_TASK_STRING = "UDP_POLLING_TASK";
const char *TM_FUNNEL_STRING = "TM_FUNNEL";
const char *PUS_SERVICE_1_VERIFICATION_STRING = "PUS_SERVICE_1_VERIFICATION";
const char *PUS_SERVICE_2_DEVICE_ACCESS_STRING = "PUS_SERVICE_2_DEVICE_ACCESS";
const char *PUS_SERVICE_5_EVENT_REPORTING_STRING = "PUS_SERVICE_5_EVENT_REPORTING";
const char *PUS_SERVICE_8_FUNCTION_MGMT_STRING = "PUS_SERVICE_8_FUNCTION_MGMT";
const char *PUS_SERVICE_9_TIME_MGMT_STRING = "PUS_SERVICE_9_TIME_MGMT";
const char *PUS_SERVICE_17_TEST_STRING = "PUS_SERVICE_17_TEST";
const char *PUS_SERVICE_200_MODE_MGMT_STRING = "PUS_SERVICE_200_MODE_MGMT";
const char *HEALTH_TABLE_STRING = "HEALTH_TABLE";
const char *MODE_STORE_STRING = "MODE_STORE";
const char *EVENT_MANAGER_STRING = "EVENT_MANAGER";
const char *INTERNAL_ERROR_REPORTER_STRING = "INTERNAL_ERROR_REPORTER";
const char *TC_STORE_STRING = "TC_STORE";
const char *TM_STORE_STRING = "TM_STORE";
const char *IPC_STORE_STRING = "IPC_STORE";
const char *DUMMY_CONTROLLER_STRING = "DUMMY_CONTROLLER";
const char *DUMMY_ASS_STRING = "DUMMY_ASS";
const char *NO_OBJECT_STRING = "NO_OBJECT";
const char* translateObject(object_id_t object){
switch((object&0xFFFFFFFF)){
case 0x01:
return DUMMY1_STRING;
case 0x02:
return DUMMY2_STRING;
case 0x03:
return DUMMY3_STRING;
case 0x04:
return DUMMY4_STRING;
case 0x05:
return DUMMY5_STRING;
case 0x06:
return DUMMY6_STRING;
case 0x07:
return DUMMY7_STRING;
case 0x08:
return DUMMY8_STRING;
case 0x09:
return TIME_STAMPER_STRING;
case 0x1:
return SOFTWARE_STRING;
case 0x10:
return CCSDS_DISTRIBUTOR_STRING;
case 0x11:
return PUS_DISTRIBUTOR_STRING;
case 0x4400AFFE:
return TEST_DEVICE_HANDLER_STRING;
case 0x4900AFFE:
return TEST_ECHO_COM_IF_STRING;
case 0x50000300:
return UDP_BRIDGE_STRING;
case 0x50000400:
return UDP_POLLING_TASK_STRING;
case 0x50000500:
return TM_FUNNEL_STRING;
case 0x53000001:
return PUS_SERVICE_1_VERIFICATION_STRING;
case 0x53000002:
return PUS_SERVICE_2_DEVICE_ACCESS_STRING;
case 0x53000005:
return PUS_SERVICE_5_EVENT_REPORTING_STRING;
case 0x53000008:
return PUS_SERVICE_8_FUNCTION_MGMT_STRING;
case 0x53000009:
return PUS_SERVICE_9_TIME_MGMT_STRING;
case 0x53000017:
return PUS_SERVICE_17_TEST_STRING;
case 0x53000200:
return PUS_SERVICE_200_MODE_MGMT_STRING;
case 0x53010000:
return HEALTH_TABLE_STRING;
case 0x53010100:
return MODE_STORE_STRING;
case 0x53030000:
return EVENT_MANAGER_STRING;
case 0x53040000:
return INTERNAL_ERROR_REPORTER_STRING;
case 0x534f0100:
return TC_STORE_STRING;
case 0x534f0200:
return TM_STORE_STRING;
case 0x534f0300:
return IPC_STORE_STRING;
case 0xCAFEAFFE:
return DUMMY_CONTROLLER_STRING;
case 0xCAFECAFE:
return DUMMY_ASS_STRING;
case 0xFFFFFFFF:
return NO_OBJECT_STRING;
default:
return "UNKNOWN_OBJECT";
}
return 0;
}

View File

View File

@ -0,0 +1,305 @@
#! /usr/bin/python3.8
"""
@file mib_packet_content_parser.py
@brief Parses the Service Packet Definition files for all variables
@details Used by the Mib Exporter, inherits generic File Parser
"""
import re
from parserbase.mib_file_list_parser import FileListParser
from parserbase.mib_parser import FileParser
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import Printer
PACKET_CONTENT_DEFINITION_DESTINATION = ["../../mission/pus/servicepackets/",
"../../fsfw/pus/servicepackets/"]
PACKET_CONTENT_CSV_NAME = "mib_packet_data_content.csv"
PACKET_CONTENT_HEADER_COLUMN = ["Service", "Subservice", "Packet Name", "Datatype", "Name",
"Size [Bytes]", "Comment"]
SQL_DELETE_PACKET_DATA_CONTENT_CMD = """
DROP TABLE IF EXISTS PacketContent;
"""
SQL_CREATE_PACKET_DATA_CONTENT_CMD = """
CREATE TABLE IF NOT EXISTS PacketContent (
id INTEGER PRIMARY KEY,
service INTEGER,
subsvc INTEGER,
packetName TEXT ,
dataType TEXT,
name TEXT,
size INTEGER,
comment TEXT
)
"""
SQL_INSERT_PACKET_DATA_CMD = """
INSERT INTO PacketContent(service,subsvc,packetName,dataType,name,size,comment)
VALUES(?,?,?,?,?,?,?)
"""
def main():
print("PacketContentParser: Parsing for header files.")
header_file_parser = FileListParser(PACKET_CONTENT_DEFINITION_DESTINATION)
header_file_list = header_file_parser.parse_header_files(False, "Parsing packet data files: ")
packet_content_parser = PacketContentParser(header_file_list)
subservice_table = packet_content_parser.parse_files(True)
Printer.print_content(subservice_table, "PacketContentParser: Printing packet data table:")
subservice_writer = CsvWriter(PACKET_CONTENT_CSV_NAME,
subservice_table, PACKET_CONTENT_HEADER_COLUMN)
subservice_writer.write_to_csv()
subservice_writer.move_csv("..")
# noinspection PyTypeChecker
class PacketContentParser(FileParser):
# Initialize all needed columns
def __init__(self, file_list):
super().__init__(file_list)
self.serviceColumn = 0
self.subserviceColumn = 1
self.classNameColumn = 2
self.datatypeColumn = 3
self.nameColumn = 4
self.sizeColumn = 5
self.commentColumn = 6
self.lastEntryColumn = 7
self.columnListLength = 8
self.dictEntryList = list(range(self.columnListLength-1))
self.datatypeMatch = False
self.ignoreFlag = False
def _handle_file_parsing(self, file_name: str, *args: any):
self_print_parsing_info = False
if len(args) == 1 and isinstance(args[0], bool):
self_print_parsing_info = args[0]
# Read service from file name
self.dictEntryList[self.serviceColumn] = re.search('[0-9]{1,3}', file_name).group(0)
self.dictEntryList[self.subserviceColumn] = " "
file = open(file_name, "r")
if self_print_parsing_info:
print("Parsing " + file_name + " ...")
# Scans each line for possible variables
for line in file.readlines():
# Looks for class and struct definitions which mark a PUS packet
self.scan_for_class_and_struct_match_and_handle_it(line)
# Looks for variables
self.scan_for_variable_match_and_handle_it(line)
# Operation taken when file parsing is complete
# All packet content sizes are set by analysing the datatype
def _post_parsing_operation(self):
self.update_packet_content_sizes()
def scan_for_class_and_struct_match_and_handle_it(self, line):
class_or_struct_match = re.search('[\s]*class[\s]*([\w]*)[\s]*.*[\s]*{[\s]*([^\n]*)', line)
if not class_or_struct_match:
class_or_struct_match = re.search(
'[\s]*struct[\s]*([\w]*)[\s]*.*[\s]*{[\s]*([^\n]*)', line)
if class_or_struct_match:
self.dictEntryList[self.classNameColumn] = class_or_struct_match.group(1)
if class_or_struct_match.group(2):
self.dictEntryList[self.subserviceColumn] = \
self.check_for_subservice_string(class_or_struct_match.group(2))
def scan_for_variable_match_and_handle_it(self, line):
# Look for datatype definitions
var_match = self.packet_content_matcher(line)
if var_match:
# Attempts to find variable definition inside that packet
self.update_packet_content_table()
def packet_content_matcher(self, line):
# First step: Search for possible parameter definitions
# Generic serialize element or datatypes
var_match = re.search(
r'[\w]*(?:<)?[\s]*(uint32_t|uint8_t|uint16_t|ReturnValue_t|Mode_t|Submode_t|'
r'object_id_t|float|double|bool|ActionId_t|EventId_t|sid_t|ParameterId_t)'
r'(?:>)?[\s]*([\w]*)[\s]*(?:[= 0-9]*)?[;](?:[\/!< ]*([^\n]*))?', line)
if var_match:
# Debug printout
# print(var_match.group(0))
self.handle_generic_variable_match(var_match)
# Serial Fixed Array List with Size Header
else:
var_match = re.search(r'[ \w]*<SerialFixedArrayListAdapter<([\w_, ()]*)>>'
r'[\s]*([\w]*)[\s]*[;](?:[/!< ]*([^\n]*))?', line)
if var_match:
self.handle_serial_fixed_array_match(var_match)
# Serial Buffer, No length field
if not var_match:
var_match = re.search(r'[ \w]*<SerialBufferAdapter<([\w_,]*)>>'
r'[\s]*([\w]*)[\s]*[;](?:[/!< ]*([^\n]*))?', line)
if not var_match:
var_match = re.search(r'[\w ]*(?:<)?(uint32_t|uint8_t|uint16_t)[\s]*\*'
r'(?:>)?[\s]*([\w]*)[\s]*[;](?:[/!< ]*([^\n]*))?', line)
if var_match:
self.handle_serial_buffer_match(var_match)
# exclude size definition in serialize adapter or any definitions which are not parameter initializations
# or typedefs
if var_match and re.search("typedef", var_match.group(0)):
var_match = False
return var_match
def update_packet_content_table(self):
self.index = self.index + 1
dict_entry_tuple = tuple(self.dictEntryList[:self.columnListLength])
if not self.ignoreFlag:
self.mib_table.update({self.index: dict_entry_tuple})
else:
self.ignoreFlag = False
def handle_generic_variable_match(self, var_match):
self.handle_var_match(var_match)
self.handle_exporter_string(var_match.group(3))
def handle_serial_fixed_array_match(self, var_match):
if self.check_for_ignore_string(var_match.group(0)):
pass
else:
fixed_array_properties = re.search(
'([\w_]*)[\s]*,[\s]*([\w_()]*)[\s]*,[\s]*([\w_()]*)[\s]*', var_match.group(1))
if fixed_array_properties:
type_of_next_buffer_size = fixed_array_properties.group(3)
self.index = self.index + 1
self.dictEntryList[self.datatypeColumn] = type_of_next_buffer_size
self.dictEntryList[self.nameColumn] = "Size of following buffer"
dict_entry_tuple = tuple(self.dictEntryList[:self.columnListLength])
self.mib_table.update({self.index: dict_entry_tuple})
self.handle_var_match(var_match)
self.dictEntryList[self.datatypeColumn] = fixed_array_properties.group(1) + " *"
self.handle_exporter_string(var_match.group(3))
def handle_serial_buffer_match(self, var_match):
self.handle_var_match(var_match)
self.dictEntryList[self.datatypeColumn] = var_match.group(1) + " *"
self.dictEntryList[self.sizeColumn] = "deduced"
self.handle_exporter_string(var_match.group(3))
def handle_var_match(self, var_match):
self.dictEntryList[self.commentColumn] = ""
self.dictEntryList[self.sizeColumn] = ""
self.dictEntryList[self.datatypeColumn] = var_match.group(1)
self.dictEntryList[self.nameColumn] = var_match.group(2)
def update_packet_content_sizes(self):
self.dictEntryList[self.sizeColumn] = " "
for key, content in self.mib_table.items():
content = self.attempt_uint_match(content)
if not self.datatypeMatch:
content = self.attempt_eight_byte_match(content)
if not self.datatypeMatch:
content = self.attempt_four_byte_match(content)
if not self.datatypeMatch:
content = self.attempt_two_byte_match(content)
if not self.datatypeMatch:
content = self.attempt_one_byte_match(content)
content = self.handle_uint_buffer_type(content)
self.mib_table.update({key: content})
def attempt_uint_match(self, content):
self.datatypeMatch = re.search('uint([\d]{1,2})_t', content[self.datatypeColumn])
if self.datatypeMatch:
content = list(content)
content[self.sizeColumn] = round(int(self.datatypeMatch.group(1)) / 8)
content = tuple(content)
return content
def attempt_four_byte_match(self, content):
self.datatypeMatch = re.search(
r'object_id_t|ActionId_t|Mode_t|float|sid_t|ParameterId_t',
content[self.datatypeColumn])
if self.datatypeMatch:
content = list(content)
content[self.sizeColumn] = 4
content = tuple(content)
return content
def attempt_eight_byte_match(self, content):
self.datatypeMatch = re.search('double', content[self.datatypeColumn])
if self.datatypeMatch:
content = list(content)
content[self.sizeColumn] = 8
content = tuple(content)
return content
def attempt_two_byte_match(self, content):
self.datatypeMatch = re.search('ReturnValue_t|EventId_t', content[self.datatypeColumn])
if self.datatypeMatch:
content = list(content)
content[self.sizeColumn] = 2
content = tuple(content)
return content
def attempt_one_byte_match(self, content):
self.datatypeMatch = re.search('Submode_t|bool', content[self.datatypeColumn])
if self.datatypeMatch:
content = list(content)
content[self.sizeColumn] = 1
content = tuple(content)
return content
def handle_uint_buffer_type(self, content):
if re.search('\*', content[self.datatypeColumn]):
content = list(content)
content[self.sizeColumn] = "deduced"
content = tuple(content)
return content
# Used to scan exporter string for ignore flag or store any comments
def handle_exporter_string(self, match):
exporter_string = re.search('[ /!<]*\[EXPORT[\w]*\][\s]*:[\s]*([^\n]*)', match)
if exporter_string:
type_string = re.search("\[TYPE|BUFFERTYPE\][\s]*([\w]*)[^\n|\[]*", exporter_string.group(0),
re.IGNORECASE)
if type_string:
self.dictEntryList[self.datatypeColumn] = str(type_string.group(1)) + " *"
comment_string = re.search("\[COMMENT\][\s]*([\w]*)[^\n|\[]*", exporter_string.group(0),
re.IGNORECASE)
if comment_string:
self.dictEntryList[self.commentColumn] = comment_string.group(1)
self.check_for_ignore_string(exporter_string.group(0))
if not comment_string:
self.dictEntryList[self.commentColumn] = exporter_string.group(1)
# Used to transform comma separated subservice numbers into specific subservice numbers
def check_for_subservice_string(self, full_description):
subservice_info = re.search(
r'^.*//[\s]*[!<]*[\s]*\[EXPORT[\w]*\][\s]*:[\s]*\[SUBSERVICE\][\s]*([^\n]*)',
full_description, re.IGNORECASE)
description = ' '
if subservice_info:
description = self.handle_subservice_string(subservice_info)
if full_description == '':
description = ' '
return description
def check_for_ignore_string(self, string):
ignore_string = re.search("IGNORE", string, re.IGNORECASE)
if ignore_string:
self.ignoreFlag = True
return True
@staticmethod
def handle_subservice_string(subservice_info):
description = ' '
subservice_list = [int(x) for x in subservice_info.group(1).split(',')]
subservice_number = len(subservice_list)
for i in range(subservice_number):
description = description + str(subservice_list[i])
if i == subservice_number - 2:
description = description + " and "
elif i < subservice_number - 1:
description = description + ", "
return description
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,71 @@
"""
@file mib_file_list_parser.py
@brief Generic File Parser class
@details
Used by parse header files. Implemented as class in case header parser becomes more complex
@author R. Mueller
@date 22.11.2019
"""
import os
import re
from typing import Union
# pylint: disable=too-few-public-methods
class FileListParser:
"""
Generic header parser which takes a directory name or directory name list
and parses all included header files recursively.
"""
def __init__(self, directory_list_or_name: Union[str, list]):
if isinstance(directory_list_or_name, str):
self.directory_list = [directory_list_or_name]
elif isinstance(directory_list_or_name, list):
self.directory_list = directory_list_or_name
else:
print("Header Parser: Passed directory list is not a header name or list of "
"header names")
self.header_files = []
def parse_header_files(self, search_recursively: bool = False,
printout_string: str = "Parsing header files: ",
print_current_dir: bool = False):
"""
This function is called to get a list of header files
:param search_recursively:
:param printout_string:
:param print_current_dir:
:return:
"""
print(printout_string, end="")
for directory in self.directory_list:
self.__get_header_file_list(directory, search_recursively, print_current_dir)
print(str(len(self.header_files)) + " header files were found.")
# g.PP.pprint(self.header_files)
return self.header_files
def __get_header_file_list(self, base_directory: str, seach_recursively: bool = False,
print_current_dir: bool = False):
if base_directory[-1] != '/':
base_directory += '/'
local_header_files = []
if print_current_dir:
print("Parsing header files in: " + base_directory)
base_list = os.listdir(base_directory)
# g.PP.pprint(base_list)
for entry in base_list:
header_file_match = re.match(r"[_.]*.*\.h", entry)
if header_file_match:
if os.path.isfile(base_directory + entry):
match_string = header_file_match.group(0)
if match_string[0] == '.' or match_string[0] == '_':
pass
else:
local_header_files.append(base_directory + entry)
if seach_recursively:
next_path = base_directory + entry
if os.path.isdir(next_path):
self.__get_header_file_list(next_path, seach_recursively)
# print("Files found in: " + base_directory)
# g.PP.pprint(local_header_files)
self.header_files.extend(local_header_files)

View File

@ -0,0 +1,71 @@
#! /usr/bin/python3.7
"""
@file
mib_packet_content_parser.py
@brief
Generic File Parser class
@details
Used by the MIB Exporter. There are two functions which can be implemented by child class.
Default implementations are empty
1. handleFileParsing(...) which handles the parsing of one file
2. updateTableEntries(...) After all files have been parsed, the table can
be updated by imlementing this function
A file list to parse must be supplied.
Child classes fill out the MIB table (self.mib_table)
@author
R. Mueller
@date
14.11.2019
"""
from abc import abstractmethod
from typing import Dict
class FileParser:
"""
This parent class gathers common file parser operations into a super class.
Everything else needs to be implemented by child classes
(e.g. which strings to parse for or operations to take after file parsing has finished)
"""
def __init__(self, file_list):
if len(file_list) == 0:
print("File list is empty !")
self.file_list_empty = True
else:
self.file_list_empty = False
self.file_list = file_list
# Can be used to have unique key in MIB tables
self.index = 0
# Initialize empty MIB table which will be filled by specific parser implementation
self.mib_table = dict()
def parse_files(self, *args: any, **kwargs) -> Dict:
"""
Core method which is called to parse the files
:param args: Optional positional arguments. Passed on the file parser
:param kwargs: Optional keyword arguments. Passed on to file parser
:return:
"""
if not self.file_list_empty:
for file_name in self.file_list:
# Implemented by child class ! Fill out info table (self.mib_table) in this routine
self._handle_file_parsing(file_name, *args, **kwargs)
# Can be implemented by child class to edit the table after it is finished.
# default implementation is empty
self._post_parsing_operation()
return self.mib_table
# Implemented by child class ! Fill out info table (self.mib_table) in this routine
@abstractmethod
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
pass
@abstractmethod
def _post_parsing_operation(self):
"""
# Can be implemented by child class to perform post parsing operations (e.g. setting a
flag or editting MIB table entries)
:return:
"""

View File

View File

@ -0,0 +1,331 @@
#! /usr/bin/python3.8
"""
@file
mib_returnvalues.py
@brief
Part of the Mission Information Base Exporter for the SOURCE project by KSat.
TODO: Integrate into Parser Structure instead of calling this file (no cpp file generated yet)
@details
Returnvalue exporter.
To use MySQLdb, run pip install mysqlclient or install in IDE.
On Windows, Build Tools installation might be necessary.
@data
21.11.2019
"""
import re
# import getpass
# import MySQLdb
from parserbase.mib_parser import FileParser
from parserbase.mib_file_list_parser import FileListParser
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import Printer, PrettyPrinter
from utility.mib_sql_writer import SqlWriter
EXPORT_TO_FILE = True
MOVE_CSV_FILE = True
EXPORT_TO_SQL = True
PRINT_TABLES = True
CSV_RETVAL_FILENAME = "mib_returnvalues.csv"
CSV_MOVE_DESTINATION = "../"
FILE_SEPARATOR = ';'
MAX_STRING_LENGTH = 32
INTERFACE_DEFINITION_FILES = ["../../fsfw/returnvalues/FwClassIds.h",
"../../config/returnvalues/classIds.h"]
RETURNVALUE_DESTINATIONS = ["../../mission/", "../../fsfw/", "../../config/"]
SQL_DELETE_RETURNVALUES_CMD = """
DROP TABLE IF EXISTS Returnvalues
"""
SQL_CREATE_RETURNVALUES_CMD = """
CREATE TABLE IF NOT EXISTS Returnvalues (
id INTEGER PRIMARY KEY,
code TEXT,
name TEXT,
interface TEXT,
file TEXT,
description TEXT
)
"""
SQL_INSERT_RETURNVALUES_CMD = """
INSERT INTO Returnvalues(code,name,interface,file,description)
VALUES(?,?,?,?,?)
"""
def main():
returnvalue_table = parse_returnvalues()
if EXPORT_TO_FILE:
ReturnValueParser.export_to_file(CSV_RETVAL_FILENAME, returnvalue_table)
if MOVE_CSV_FILE:
handle_file_move(CSV_MOVE_DESTINATION)
if EXPORT_TO_SQL:
pass
# print("ReturnvalueParser: Exporting to SQL")
# sql_retval_exporter(returnvalue_table)
def parse_returnvalues():
""" Core function to parse for the return values """
interface_parser = InterfaceParser(INTERFACE_DEFINITION_FILES, PRINT_TABLES)
interfaces = interface_parser.parse_files()
header_parser = FileListParser(RETURNVALUE_DESTINATIONS)
header_list = header_parser.parse_header_files(True, "Parsing header file list: ")
returnvalue_parser = ReturnValueParser(interfaces, header_list, PRINT_TABLES)
returnvalue_table = returnvalue_parser.parse_files(True)
if PRINT_TABLES:
Printer.print_content(returnvalue_table, "Returnvalue Table: ")
print("ReturnvalueParser: "
"Found " + str(len(returnvalue_table)) + " returnvalues.")
return returnvalue_table
def handle_file_move(destination: str):
""" Handles moving the CSV file somewhere """
csv_writer = CsvWriter(CSV_RETVAL_FILENAME)
if MOVE_CSV_FILE:
csv_writer.move_csv(destination)
class InterfaceParser(FileParser):
def __init__(self, file_list, print_table):
super().__init__(file_list)
self.count = 0
self.print_table = print_table
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
try:
file = open(file_name, 'r', encoding='utf-8')
all_lines = file.readlines()
except UnicodeDecodeError:
file = open(file_name, 'r', encoding='cp1252')
all_lines = file.readlines()
if "FwClassIds.h" in file_name:
count_matched = False
# Parses first entry, which has explicit value 1
for line in all_lines:
if not count_matched:
match = re.search(r'[\s]*([A-Z_0-9]*) = ([0-9]*),[\s]*//([A-Z]{1,3})', line)
else:
match = re.search(r'[\s]*([A-Z_0-9]*),[\s]*//([A-Z]{1,3})', line)
if match and not count_matched:
self.count = int(match.group(2))
self.mib_table.update({match.group(1): [1, match.group(3)]})
self.count += 1
count_matched = True
elif match:
self.mib_table.update({match.group(1): [self.count, match.group(2)]})
self.count += 1
elif "classIds.h" in file_name:
file = open(file_name, "r")
all_lines = file.readlines()
for line in all_lines:
match = re.search(r'[\s]*([\w]*) = FW_CLASS_ID_COUNT,[\s]*(//([A-Z]{1,3}))?', line)
if match:
self.mib_table.update({match.group(1): [self.count, match.group(2)]})
self.count += 1
for line in all_lines:
match = re.search(r'^[\s]*([\w]*)[,]*[\s]*//[!<]*[\s]*([^\n]*)', line)
if match:
self.mib_table.update({match.group(1): [self.count, match.group(2)]})
self.count += 1
def _post_parsing_operation(self):
if self.print_table:
PrettyPrinter.pprint(self.mib_table)
class ReturnValueParser(FileParser):
"""
Generic return value parser.
"""
def __init__(self, interfaces, file_list, print_tables):
super().__init__(file_list)
self.print_tables = print_tables
self.interfaces = interfaces
self.return_value_dict = dict()
self.count = 0
# Stores last three lines
self.last_lines = ["", "", ""]
self.current_interface_id_entries = {
"Name": "",
"ID": 0,
"FullName": ""
}
self.return_value_dict.update({0: ('OK', 'System-wide code for ok.', 'RETURN_OK',
'HasReturnvaluesIF.h', 'HasReturnvaluesIF')})
self.return_value_dict.update({1: ('Failed', 'Unspecified system-wide code for failed.',
'RETURN_FAILED', 'HasReturnvaluesIF.h',
'HasReturnvaluesIF')})
def _handle_file_parsing(self, file_name: str, *args, **kwargs):
try:
file = open(file_name, 'r', encoding='utf-8')
all_lines = file.readlines()
except UnicodeDecodeError:
print("ReturnValueParser: Decoding error with file " + file_name)
file = open(file_name, 'r', encoding='cp1252')
all_lines = file.readlines()
if len(args) == 1:
print_truncated_entries = args[0]
else:
print_truncated_entries = False
for line in all_lines:
self.__handle_line_reading(line, file_name, print_truncated_entries)
def __handle_line_reading(self, line, file_name, print_truncated_entries: bool):
newline = line
if self.last_lines[0] != '\n':
two_lines = self.last_lines[0] + ' ' + newline.strip()
else:
two_lines = ''
interface_id_match = re.search(r'INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)',
two_lines)
if interface_id_match:
self.__handle_interfaceid_match(interface_id_match)
returnvalue_match = re.search(
r'^[\s]*static const(?:expr)? ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*'
r'MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(//)?([^\n]*)',
two_lines)
if returnvalue_match:
self.__handle_returnvalue_match(returnvalue_match, file_name, print_truncated_entries)
self.last_lines[1] = self.last_lines[0]
self.last_lines[0] = newline
def __handle_interfaceid_match(self, interface_id_match):
# print("Interface ID" + str(match1.group(1)) + "found in " + fileName)
self.current_interface_id_entries["ID"] = \
self.interfaces[interface_id_match.group(1)][0]
self.current_interface_id_entries["Name"] = \
self.interfaces[interface_id_match.group(1)][1]
self.current_interface_id_entries["FullName"] = interface_id_match.group(1)
# print( "Current ID: " + str(self.current_interface_id_entries["ID"]) )
def __handle_returnvalue_match(self, returnvalue_match, file_name: str,
print_truc_entries: bool):
# valueTable.append([])
description = self.clean_up_description(returnvalue_match.group(4))
string_to_add = self.build_checked_string(
self.current_interface_id_entries["Name"], returnvalue_match.group(1),
print_truc_entries)
full_id = (self.current_interface_id_entries["ID"] << 8) + return_number_from_string(
returnvalue_match.group(2))
if full_id in self.return_value_dict:
# print('Duplicate returncode ' + hex(full_id) + ' from ' + file_name +
# ' was already in ' + self.return_value_dict[full_id][3])
pass
self.return_value_dict.update(
{full_id: (string_to_add, description, returnvalue_match.group(1),
file_name, self.current_interface_id_entries["FullName"])})
# valueTable[count].append(fullId)
# valueTable[count].append(stringToAdd)
self.count = self.count + 1
def _post_parsing_operation(self):
if self.print_tables:
PrettyPrinter.pprint(self.return_value_dict)
self.mib_table = self.return_value_dict
@staticmethod
def export_to_file(filename: str, list_of_entries: dict):
file = open(filename, "w")
for entry in list_of_entries.items():
file.write(hex(entry[0]) + FILE_SEPARATOR + entry[1][0] + FILE_SEPARATOR + entry[1][1] +
FILE_SEPARATOR + entry[1][2] + FILE_SEPARATOR
+ entry[1][3] + FILE_SEPARATOR + entry[1][4] + '\n')
file.close()
def build_checked_string(self, first_part, second_part, print_truncated_entries: bool):
""" Build a checked string """
my_str = first_part + '_' + self.convert(second_part)
if len(my_str) > MAX_STRING_LENGTH:
if print_truncated_entries:
print("Warning: Entry " + my_str + " too long. Will truncate.")
my_str = my_str[0:MAX_STRING_LENGTH]
else:
# print("Entry: " + myStr + " is all right.")
pass
return my_str
@staticmethod
def convert(name):
single_strings = name.split('_')
new_string = ''
for one_string in single_strings:
one_string = one_string.lower()
one_string = one_string.capitalize()
new_string = new_string + one_string
return new_string
@staticmethod
def clean_up_description(descr_string):
description = descr_string.lstrip('!<- ')
if description == '':
description = ' '
return description
def return_number_from_string(a_string):
if a_string.startswith('0x'):
return int(a_string, 16)
if a_string.isdigit():
return int(a_string)
print('Error: Illegal number representation: ' + a_string)
return 0
def sql_retval_exporter(returnvalue_table):
sql_writer = SqlWriter()
sql_writer.open(SQL_CREATE_RETURNVALUES_CMD)
for entry in returnvalue_table.items():
sql_writer.write_entries(
SQL_INSERT_RETURNVALUES_CMD, (entry[0],
entry[1][2],
entry[1][4],
entry[1][3],
entry[1][1]))
sql_writer.commit()
sql_writer.close()
# def writeEntriesToDB(listOfEntries):
# print("Connecting to database...")
# user = getpass.getpass("User: ")
# passwd = getpass.getpass()
# conn = MySQLdb.connect(host="127.0.0.1", user=user, passwd=passwd, db="flpmib")
# written = conn.cursor()
# print("done.")
# # delete old entries
# print("Kill old entries.")
# written.execute("DELETE FROM txp WHERE TXP_NUMBR = 'DSX00000'")
# print("Insert new ones:")
# for entry in listOfEntries.items():
# written.execute("INSERT INTO txp (txp_numbr, txp_from, txp_to, txp_altxt) "
# "VALUES ('DSX00000', %s, %s, %s)", [entry[0], entry[0], entry[1][0]])
# conn.commit()
# print("Done. That was easy.")
#
#
# def writeEntriesToOtherDB(listOfEntries):
# print("Connecting to other database...")
# conn = MySQLdb.connect(host="buggy.irs.uni-stuttgart.de",
# user='returncode', passwd='returncode', db="returncode")
# written = conn.cursor()
# print("connected.")
# # delete old entries
# print("Kill old entries.")
# written.execute("DELETE FROM returncodes WHERE true")
# print("Insert new ones:")
# for entry in listOfEntries.items():
# written.execute("INSERT INTO returncodes (code,name,interface,file,description) "
# "VALUES (%s, %s, %s, %s, %s)",
# [entry[0], entry[1][2], entry[1][4], entry[1][3], entry[1][1]])
# conn.commit()
# print("Done. That was hard.")
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,242 @@
"""
@file mib_subservice_parser.py
@brief Parses the Subservice definitions for the Mission Information Base.
@details Used by the MIB Exporter, inherits generic File Parser
@author R. Mueller
@date 14.11.2019
Example Stringset to scan for:
enum Subservice: uint8_t {
//!< [EXPORT] : [COMMAND] Perform connection test
CONNECTION_TEST = 1,
//!< [EXPORT] : [REPLY] Connection test reply
CONNECTION_TEST_REPORT = 2,
EVENT_TRIGGER_TEST = 128, //!< [EXPORT] : [COMMAND] Trigger test reply and test event
MULTIPLE_EVENT_TRIGGER_TEST = 129, //!< [EXPORT] : [COMMAND] Trigger multiple events (5)
MULTIPLE_CONNECTION_TEST = 130 //!< [EXPORT] : [COMMAND] Trigger multiple connection tests
};
"""
import re
from enum import Enum
from parserbase.mib_file_list_parser import FileListParser
from parserbase.mib_parser import FileParser
from utility.mib_csv_writer import CsvWriter
from utility.mib_printer import Printer
SUBSERVICE_DEFINITION_DESTINATION = ["../../mission/", "../../fsfw/pus/"]
SUBSERVICE_CSV_NAME = "mib_subservices.csv"
SUBSERVICE_COLUMN_HEADER = ["Service", "Subservice Name", "Subservice Number", "Type", "Comment"]
SQL_DELETE_SUBSVC_CMD = """
DROP TABLE IF EXISTS Subservice;
"""
SQL_CREATE_SUBSVC_CMD = """
CREATE TABLE IF NOT EXISTS Subservice(
id INTEGER PRIMARY KEY,
service INTEGER,
subsvcName TEXT,
subsvcNumber INTEGER,
type TEXT CHECK( type IN ('TC','TM')),
comment TEXT
)
"""
SQL_INSERT_INTO_SUBSVC_CMD = """
INSERT INTO Subservice(service,subsvcName,subsvcNumber,type,comment)
VALUES(?,?,?,?,?)
"""
class SubserviceColumns(Enum):
"""
Specifies order of MIB columns
"""
SERVICE = 0
NAME = 1
NUMBER = 2
TYPE = 3
COMMENT = 4
Clmns = SubserviceColumns
def main():
"""
If this file is run separately, this main will be run.
:return:
"""
header_parser = FileListParser(SUBSERVICE_DEFINITION_DESTINATION)
header_file_list = header_parser.parse_header_files(False, "Parsing subservice header files: ")
packet_subservice_parser = SubserviceParser(header_file_list)
subservice_table = packet_subservice_parser.parse_files()
Printer.print_content(subservice_table, "Printing subservice table:")
print("Found " + str(len(subservice_table)) + " subservice entries.")
subservice_writer = CsvWriter(SUBSERVICE_CSV_NAME, subservice_table, SUBSERVICE_COLUMN_HEADER)
subservice_writer.write_to_csv()
subservice_writer.move_csv("..")
# TODO: Not really happy with the multi-line implementation, but this is not trivial..
# Right not, we are not using the last lines stored, we just store the string
# of the last line (if its only a comment). It propably would be better to always
# scan 3 or 4 lines at once. However, this is not easy too..
# pylint: disable=too-few-public-methods
class SubserviceParser(FileParser):
"""
This parser class can parse the subservice definitions.
"""
def __init__(self, file_list: list):
super().__init__(file_list)
# Column System allows reshuffling of table columns in constructor
self.clmns_len = SubserviceColumns.__len__()
# this table includes the current new table entry,
# which will be updated for target parameter
self.dict_entry_list = list(range(self.clmns_len))
self.dict_entry_list[Clmns.COMMENT.value] = ""
self.subservice_enum_found = False
# This list will store the last three lines for longer comments.
self.last_line_list = ["", "", ""]
# If an export command was found, cache the possibility of a match.
self.possible_match_on_next_lines = False
# This is called for every file
def _handle_file_parsing(self, file_name: str, *args: any, **kwargs):
self_print_parsing_info = False
if len(args) == 1 and isinstance(args[0], bool):
self_print_parsing_info = args[0]
# Read service from file name
service_match = re.search('Service[^0-9]*([0-9]{1,3})', file_name)
if service_match:
self.dict_entry_list[Clmns.SERVICE.value] = service_match.group(1)
self.dict_entry_list[Clmns.NAME.value] = " "
file = open(file_name, "r")
if self_print_parsing_info:
print("Parsing " + file_name + " ...")
# Scans each line for possible variables
for line in file.readlines():
self.__handle_line_reading(line)
def __handle_line_reading(self, line):
"""
Handles the reading of single lines.
:param line:
:return:
"""
# Case insensitive matching
enum_match = re.search(r'[\s]*enum[\s]*Subservice([^\n]*)', line, re.IGNORECASE)
if enum_match:
self.subservice_enum_found = True
if self.subservice_enum_found:
self.__handle_enum_scanning(line)
self.last_line_list[2] = self.last_line_list[1]
self.last_line_list[1] = self.last_line_list[0]
self.last_line_list[0] = line
def __handle_enum_scanning(self, line: str):
"""
Two-line reading. First check last line. For export command.
"""
self.__scan_for_export_command(self.last_line_list[0])
subservice_match = self.__scan_subservices(line)
if subservice_match:
self.index = self.index + 1
dict_entry_tuple = tuple(self.dict_entry_list[:self.clmns_len])
self.mib_table.update({self.index: dict_entry_tuple})
self.__clear_tuple()
def __clear_tuple(self):
self.dict_entry_list[Clmns.NAME.value] = ""
self.dict_entry_list[Clmns.TYPE.value] = ""
self.dict_entry_list[Clmns.NUMBER.value] = ""
self.dict_entry_list[Clmns.COMMENT.value] = ""
self.possible_match_on_next_lines = False
def __scan_for_export_command(self, line: str) -> bool:
command_string = re.search(r"([^\[]*)\[export\][: ]*\[([\w]*)\][\s]*([^\n]*)",
line, re.IGNORECASE)
if command_string:
# Check whether there is a separated export command
# (export command is not on same line as subservice definition)
# ugly solution but has worked so far.
string = command_string.group(1).lstrip()
if len(string) <= 8:
self.possible_match_on_next_lines = True
if self.__scan_for_type(line):
self.__scan_for_comment(line)
return True
self.__add_possible_comment_string(line)
return False
def __add_possible_comment_string(self, line):
"""
If no command was found, the line might be a continuation of a comment.
Strip whitespaces and comment symbols and add to comment buffer.
"""
possible_multiline_comment = line.lstrip()
possible_multiline_comment = possible_multiline_comment.lstrip('/')
possible_multiline_comment = possible_multiline_comment.lstrip('<')
possible_multiline_comment = possible_multiline_comment.lstrip('!')
possible_multiline_comment = possible_multiline_comment.rstrip()
if len(possible_multiline_comment) > 0:
self.dict_entry_list[Clmns.COMMENT.value] += possible_multiline_comment
def __scan_subservices(self, line):
"""
Scan for subservice match.
:param line:
:return:
"""
subservice_match = \
re.search(r"[\s]*([\w]*)[\s]*=[\s]*([0-9]{1,3})(?:,)?(?:[ /!<>]*([^\n]*))?", line)
if subservice_match:
self.dict_entry_list[Clmns.NAME.value] = subservice_match.group(1)
self.dict_entry_list[Clmns.NUMBER.value] = subservice_match.group(2)
# I am assuming that an export string is longer than 7 chars.
if len(subservice_match.group(3)) > 7:
# Export command on same line overrides old commands. Read for comment.
if self.__process_comment_string(subservice_match.group(3)):
return True
# Check whether exporting was commanded on last lines
return bool(self.possible_match_on_next_lines)
if re.search(r'}[\s]*;', line):
self.subservice_enum_found = False
return subservice_match
def __process_comment_string(self, comment_string) -> bool:
# look for packet type specifier
export_command_found = self.__scan_for_type(comment_string)
# Look for everything after [EXPORT] : [TYPESPECIFIER] as comment
if export_command_found:
self.__scan_for_comment(comment_string)
return export_command_found
def __scan_for_type(self, string) -> bool:
type_match = re.search(r'\[reply\]|\[tm\]', string, re.IGNORECASE)
if type_match:
self.dict_entry_list[Clmns.TYPE.value] = 'TM'
return True
type_match = re.search(r'\[command\]|\[tc\]', string, re.IGNORECASE)
if type_match:
self.dict_entry_list[Clmns.TYPE.value] = 'TC'
return True
self.dict_entry_list[Clmns.TYPE.value] = 'Unspecified'
return False
def __scan_for_comment(self, comment_string):
comment_match = re.search(r':[\s]*\[[\w]*\][\s]*([^\n]*)', comment_string)
if comment_match:
self.dict_entry_list[Clmns.COMMENT.value] = comment_match.group(1)
def _post_parsing_operation(self):
pass
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,62 @@
#! /usr/bin/python3.7
"""
@file
mib_packet_content_parser.py
@brief
CSV Writer
@details
This class writes tables to a csv.
@author
R. Mueller
@date
14.11.2019
"""
from utility import mib_globals as g
from utility.mib_file_management import copy_file, move_file
# TODO: Export to SQL
class CsvWriter:
def __init__(self, filename, table_to_print=None, header_array=None):
if header_array is None:
header_array = []
if table_to_print is None:
table_to_print = dict()
self.filename = filename
self.tableToPrint = table_to_print
self.headerArray = header_array
if self.headerArray != 0:
self.columnNumbers = len(self.headerArray)
self.fileSeparator = g.fileSeparator
def write_to_csv(self):
file = open(self.filename, "w")
file.write("Index" + self.fileSeparator)
for index in range(self.columnNumbers):
# noinspection PyTypeChecker
if index < len(self.headerArray)-1:
file.write(self.headerArray[index] + self.fileSeparator)
else:
file.write(self.headerArray[index] + "\n")
for index, entry in self.tableToPrint.items():
file.write(str(index) + self.fileSeparator)
for columnIndex in range(self.columnNumbers):
# noinspection PyTypeChecker
if columnIndex < len(self.headerArray) - 1:
file.write(str(entry[columnIndex]) + self.fileSeparator)
else:
file.write(str(entry[columnIndex]) + "\n")
file.close()
def copy_csv(self, copy_destination: str = g.copyDestination):
copy_file(self.filename, copy_destination)
print("CSV file was copied to " + copy_destination)
def move_csv(self, move_destination):
move_file(self.filename, move_destination)
if move_destination == ".." or move_destination == "../":
print("CSV Writer: CSV file was moved to parser root directory")
else:
print("CSV Writer: CSV file was moved to " + move_destination)

View File

@ -0,0 +1,22 @@
#! /usr/bin/python3.8
# -*- coding: utf-8 -*-
import shutil
import os
def copy_file(filename:str, destination: str= ""):
if os.path.exists(filename):
try:
shutil.copy2(filename, destination)
except FileNotFoundError as error:
print("File not found!")
print(error)
def move_file(file_name: str, destination: str= ""):
if os.path.exists(file_name):
try:
shutil.copy2(file_name, destination)
os.remove(file_name)
except FileNotFoundError as error:
print("File not found!")
print(error)

View File

@ -0,0 +1,18 @@
"""
@file
mib_globals.py
@date
16.11.2019
@brief
Global settings for MIB exporter
"""
import pprint
doExportMiB = True
executeSQLcommands = False
printToConsole = True
exportToCSV = True
doCopyFile = False
copyDestination = "."
fileSeparator = ';'

View File

@ -0,0 +1,15 @@
import pprint
PrettyPrinter = pprint.PrettyPrinter(indent=0, width=250)
class Printer:
def __init__(self):
pass
@staticmethod
def print_content(dictionary, leading_string: str = ""):
if leading_string != "":
print(leading_string)
PrettyPrinter.pprint(dictionary)
print("\r\n", end="")

View File

@ -0,0 +1,37 @@
import sqlite3
SQL_DATABASE_NAME = "obsw_mib.db"
class SqlWriter:
def __init__(self, filename: str = SQL_DATABASE_NAME):
self.filename = filename
self.conn = sqlite3.connect(self.filename)
def open(self, sql_creation_command: str):
print("SQL Writer: Opening " + self.filename)
self.conn.execute(sql_creation_command)
def delete(self, sql_deletion_command):
print("SQL Writer: Deleting SQL table")
self.conn.execute(sql_deletion_command)
def write_entries(self, sql_insertion_command, current_entry):
cur = self.conn.cursor()
cur.execute(sql_insertion_command, current_entry)
return cur.lastrowid
def commit(self):
print("SQL Writer: Commiting SQL table")
self.conn.commit()
def close(self):
self.conn.close()
def sql_writing_helper(self, creation_cmd, insertion_cmd, mib_table: dict, deletion_cmd: str=""):
if deletion_cmd != "":
self.delete(deletion_cmd)
self.open(creation_cmd)
for i in mib_table:
self.write_entries(insertion_cmd, mib_table[i])
self.commit()
self.close()

View File

@ -0,0 +1,197 @@
#! /usr/bin/env python3
import string
import sys
import datetime
import collections
import re
import os
import fileinput
import getpass
import MySQLdb
def parseInterfaceDefinitionFile( fwFilename, missionFilename ):
file = open(fwFilename, "r")
interfaces = dict()
allLines = file.readlines()
count = 0
for line in allLines:
match = re.search( '[\s]*([A-Z_0-9]*) = ([0-9]*)\,[\s]*//([A-Z]{1,3})', line)
if match:
count = int(match.group(2));
print("Count is " + str(count))
interfaces.update( {match.group(1): [1, match.group(3)]} )
for line in allLines:
match = re.search( '[\s]*([A-Z_0-9]*),[\s]*//([A-Z]{1,3})', line)
if match:
interfaces.update( {match.group(1): [count, match.group(2)]} )
count += 1
file = open(missionFilename, "r")
allLines = file.readlines()
for line in allLines:
match = re.search( '[\s]*([A-Z_0-9]*) = FW_CLASS_ID_COUNT\,[\s]*//([A-Z]{1,3})', line)
if match:
interfaces.update( {match.group(1): [count, match.group(2)]} )
count += 1
for line in allLines:
match = re.search( '^[\s]*([A-Z_0-9]*),[\s]*//([A-Z]{1,3})', line)
if match:
interfaces.update( {match.group(1): [count, match.group(2)]} )
count += 1
print("Found interfaces : " + str(count - 1))
return interfaces
def returnNumberFromString( aString ):
if aString.startswith('0x'):
return int( aString, 16)
elif aString.isdigit():
return int( aString)
else:
print('Error: Illegeal number representation: ' + aString)
return 0
def convert(name):
singleStrings = name.split('_')
newString = ''
for oneString in singleStrings:
oneString = oneString.lower()
oneString = oneString.capitalize()
newString = newString + oneString
return newString
def buildCheckedString( firstPart, secondPart ):
myStr = firstPart + convert(secondPart)
if len(myStr) > 14:
print( "Error: Entry: " + myStr + " too long. Will truncate.")
myStr = myStr[0:14]
else:
print( "Entry: " + myStr + " is all right.")
return myStr
def cleanUpDescription( descrString ):
description = descrString.lstrip('!<- ')
if description == '':
description = ' '
return description
def parseHeaderFiles( interfaceList, fileList):
dictionnary = dict()
count = 0
currentId = 0
currentName = ""
#Add OK and UNSPECIFIED_FAILED
dictionnary.update( {0: ('OK', 'System-wide code for ok.', 'RETURN_OK', 'HasReturnvaluesIF.h', 'HasReturnvaluesIF')} )
dictionnary.update( {1: ('Failed', 'Unspecified system-wide code for failed.', 'RETURN_FAILED', 'HasReturnvaluesIF.h', 'HasReturnvaluesIF')} )
for fileName in fileList:
file = open(fileName, "r")
oldline = file.readline()
while True:
newline = file.readline()
if not newline: break #EOF
if not oldline == '\n':
twoLines = oldline + ' ' + newline.strip()
else:
twoLines = ''
match1 = re.search ( 'INTERFACE_ID[\s]*=[\s]*CLASS_ID::([a-zA-Z_0-9]*)', twoLines)
if match1:
currentId = interfaceList[match1.group(1)][0]
currentName = interfaceList[match1.group(1)][1]
currentFullName = match1.group(1)
#print( "Current ID: " + str(currentId) )
myId = currentId
match = re.search( '^[\s]*static const ReturnValue_t[\s]*([a-zA-Z_0-9]*)[\s]*=[\s]*MAKE_RETURN_CODE[\s]*\([\s]*([x0-9a-fA-F]{1,4})[\s]*\);[\t ]*(\/\/){0,1}([^\n]*)', twoLines)
if match:
# valueTable.append([])
description = cleanUpDescription(match.group(4))
stringToAdd = buildCheckedString( currentName, match.group(1))
fullId = (myId << 8) + returnNumberFromString(match.group(2))
if fullId in dictionnary:
print('duplicate returncode ' + hex(fullId) + ' from ' + fileName + ' was already in ' + dictionnary[fullId][3])
dictionnary.update( {fullId: (stringToAdd, description, match.group(1), fileName, currentFullName)} )
# valueTable[count].append(fullId)
# valueTable[count].append(stringToAdd)
count = count+1
else:
None
oldline = newline
# valueTable.pop()
return dictionnary
def getHeaderFileList( base ):
#print ( "getHeaderFileList called with" + base )
baseList = os.listdir( base )
fileList = []
for entry in baseList:
#Remove all hidden files:
if ( (os.path.isdir(base + entry) == True) and (entry[0] != ".") and (entry[0] != "_") ):
fileList = fileList + getHeaderFileList(base + entry + "/")
if re.match("[^\.]*\.h", entry) and os.path.isfile(base + entry):
fileList.append(base + entry)
return fileList
def writeEntriesToDB( listOfEntries ):
print ("Connecting to database...")
user = getpass.getpass("User: ")
passwd = getpass.getpass()
conn = MySQLdb.connect(host = "127.0.0.1", user = user, passwd = passwd, db = "flpmib")
written = conn.cursor()
print ("done.")
#delete old entries
print ("Kill old entries.")
written.execute("DELETE FROM txp WHERE TXP_NUMBR = 'DSX00000'")
print("Insert new ones:")
for entry in listOfEntries.items():
written.execute("INSERT INTO txp (txp_numbr, txp_from, txp_to, txp_altxt) VALUES ('DSX00000', %s, %s, %s)", [entry[0], entry[0], entry[1][0]])
conn.commit()
print("Done. That was easy.")
def writeEntriesToOtherDB( listOfEntries ):
print ("Connecting to other database...")
conn = MySQLdb.connect(host = "buggy.irs.uni-stuttgart.de", user = 'returncode', passwd = 'returncode', db = "returncode")
written = conn.cursor()
print ("connected.")
#delete old entries
print ("Kill old entries.")
written.execute("DELETE FROM returncodes WHERE true")
print("Insert new ones:")
for entry in listOfEntries.items():
written.execute("INSERT INTO returncodes (code,name,interface,file,description) VALUES (%s, %s, %s, %s, %s)", [entry[0], entry[1][2], entry[1][4], entry[1][3], entry[1][1]])
conn.commit()
print("Done. That was hard.")
def exportToFile( filename, listOfEntries ):
print ('Exporting to file: ' + filename )
file = open(filename, "w")
for entry in listOfEntries.items():
file.write(hex(entry[0]) + '\t' + entry[1][0] + '\t' + entry[1][1] + '\t' + entry[1][2] + '\t' + entry[1][3] + '\t' + entry[1][4] + '\n' )
file.close()
return
def parseOBSW():
idInterfaceDefinitions = parseInterfaceDefinitionFile( "../../framework/returnvalues/FwClassIds.h", "../../config/returnvalues/classIds.h")
#print ("Dictionary size is " + str(len(idInterfaceDefinitions)) )
for entry in sorted(idInterfaceDefinitions):
print(entry)
myHeaderList = getHeaderFileList( "../../mission/" )
myHeaderList = myHeaderList + getHeaderFileList( "../../framework/" )
myHeaderList = myHeaderList + getHeaderFileList( "../../config/" )
myHeaderList = myHeaderList + getHeaderFileList( "../../bsp_linux/" )
mySecondList = parseHeaderFiles( idInterfaceDefinitions, myHeaderList)
# print(mySecondList[14081])
# print (mySecondList.items()[0][1])
# print( "Found entries:" )
counter = 0
# for entry in sorted(mySecondList):
# print(entry)
for entry in mySecondList.items():
counter = counter + 1
# print( entry[0], entry[1][0], entry[1][1] )
print("Count: ", counter)
if (len(sys.argv) > 1):
exportToFile( str(sys.argv[1]), mySecondList )
else:
print('No export to file requested.')
#writeEntriesToOtherDB( mySecondList )
#writeEntriesToDB( mySecondList )
parseOBSW()