import socket from typing import Optional import json import base64 from tmtccmd.logging import get_console_logger from tmtccmd.utility.obj_id import ObjectId from dle_encoder import DleEncoder # TODO add to configuration parameters SERVER_HOST = "" SERVER_PORT = 7305 LOGGER = get_console_logger() class TmTcpServer: _Instance = None def __init__( self): self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((SERVER_HOST, SERVER_PORT)) # for now, only accept one connection self.server_socket.listen(0) self.server_socket.setblocking(False) self.client_connection: Optional[socket.socket] = None self.dle_encoder = DleEncoder() def __del__(self): try: self.close() except: LOGGER.warning("Could not close sockets!") def close(self): self.server_socket.close() if self.client_connection != None: self.client_connection.close() def getInstance(): if TmTcpServer._Instance == None: TmTcpServer._Instance = TmTcpServer() return TmTcpServer._Instance def _send_dictionary_over_socket(self, dictionary): # keep listeners current if self.client_connection == None: # no running connection, see if a client wants to connect try: (self.client_connection, _) = self.server_socket.accept() self.client_connection.setblocking(False) except: # no client waiting return data_json_bytes = json.dumps(dictionary).encode() # dle encode the bytes # adding a newline because someone might want to look at it in a console data_json_bytes = self.dle_encoder.encode(data_json_bytes + b'\n') try: sent_length = self.client_connection.send(data_json_bytes) except: self.client_connection = None return if sent_length == 0: self.client_connection.close() self.client_connection = None def report_raw_hk_data(self, object_id: ObjectId, set_id: int, hk_data: bytes): data_dict = {} data_dict["type"] = "TM" data_dict["tmType"] = "Raw HK" data_dict["objectId"] = object_id.as_string data_dict["setId"] = set_id data_dict["rawData"] = base64.b64encode(hk_data).decode() self._send_dictionary_over_socket(data_dict) def report_parsed_hk_data(self, object_id: ObjectId, set_id: int, data_dictionary): data_dict = {} data_dict["type"] = "TM" data_dict["tmType"] = "Parsed HK" data_dict["objectId"] = object_id.as_string data_dict["setId"] = set_id data_dict["content"] = data_dictionary self._send_dictionary_over_socket(data_dict)