WIP: parsing Thermal Controller TM and sending it via TCP/IP
This commit is contained in:
parent
aadbfb2594
commit
05ca9e37c3
@ -2,6 +2,8 @@
|
|||||||
import struct
|
import struct
|
||||||
import os
|
import os
|
||||||
import datetime
|
import datetime
|
||||||
|
import json
|
||||||
|
import socket
|
||||||
|
|
||||||
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
from tmtccmd.utility.tmtc_printer import FsfwTmTcPrinter
|
||||||
from tmtccmd.config.definitions import HkReplyUnpacked
|
from tmtccmd.config.definitions import HkReplyUnpacked
|
||||||
@ -21,6 +23,12 @@ import config.object_ids as obj_ids
|
|||||||
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
|
from pus_tm.devs.reaction_wheels import handle_rw_hk_data
|
||||||
from pus_tm.defs import PrintWrapper, FsfwTmTcPrinter, log_to_both
|
from pus_tm.defs import PrintWrapper, FsfwTmTcPrinter, log_to_both
|
||||||
|
|
||||||
|
|
||||||
|
#TODO add to configuration parameters
|
||||||
|
THERMAL_HOST = "127.0.0.1"
|
||||||
|
THERMAL_PORT = 7302
|
||||||
|
|
||||||
|
|
||||||
LOGGER = get_console_logger()
|
LOGGER = get_console_logger()
|
||||||
|
|
||||||
|
|
||||||
@ -98,6 +106,8 @@ def handle_regular_hk_print(
|
|||||||
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
|
handle_p60_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
|
||||||
if objb == obj_ids.PL_PCDU_ID:
|
if objb == obj_ids.PL_PCDU_ID:
|
||||||
log_to_both(printer, "Received PL PCDU HK data")
|
log_to_both(printer, "Received PL PCDU HK data")
|
||||||
|
if objb == obj_ids.THERMAL_CONTROLLER_ID:
|
||||||
|
handle_thermal_controller_hk_data(printer=printer, set_id=set_id, hk_data=hk_data)
|
||||||
else:
|
else:
|
||||||
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
|
LOGGER.info("Service 3 TM: Parsing for this SID has not been implemented.")
|
||||||
return HkReplyUnpacked()
|
return HkReplyUnpacked()
|
||||||
@ -289,6 +299,47 @@ def handle_self_test_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
|||||||
log_to_both(printer, str(content_list))
|
log_to_both(printer, str(content_list))
|
||||||
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
printer.print_validity_buffer(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||||
|
|
||||||
|
def handle_thermal_controller_hk_data(printer: FsfwTmTcPrinter, set_id: int, hk_data: bytes):
|
||||||
|
if set_id == 0:
|
||||||
|
LOGGER.info("Received Sensor Temperature data")
|
||||||
|
|
||||||
|
#get all the floats
|
||||||
|
tm_data = struct.unpack("!ffffffffffffffff", hk_data[:16*4])
|
||||||
|
parsed_data = {}
|
||||||
|
|
||||||
|
#put them into a nice dictionary
|
||||||
|
parsed_data["SID"] = set_id
|
||||||
|
parsed_data["content"] = {}
|
||||||
|
parsed_data["content"]["SENSOR_PLOC_HEATSPREADER"] = tm_data[0]
|
||||||
|
parsed_data["content"]["SENSOR_PLOC_MISSIONBOARD"] = tm_data[1]
|
||||||
|
parsed_data["content"]["SENSOR_4K_CAMERA"] = tm_data[2]
|
||||||
|
parsed_data["content"]["SENSOR_DAC_HEATSPREADER"] = tm_data[3]
|
||||||
|
parsed_data["content"]["SENSOR_STARTRACKER"] = tm_data[4]
|
||||||
|
parsed_data["content"]["SENSOR_RW1"] = tm_data[5]
|
||||||
|
parsed_data["content"]["SENSOR_DRO"] = tm_data[6]
|
||||||
|
parsed_data["content"]["SENSOR_SCEX"] = tm_data[7]
|
||||||
|
parsed_data["content"]["SENSOR_X8"] = tm_data[8]
|
||||||
|
parsed_data["content"]["SENSOR_HPA"] = tm_data[9]
|
||||||
|
parsed_data["content"]["SENSOR_TX_MODUL"] = tm_data[10]
|
||||||
|
parsed_data["content"]["SENSOR_MPA"] = tm_data[11]
|
||||||
|
parsed_data["content"]["SENSOR_ACU"] = tm_data[12]
|
||||||
|
parsed_data["content"]["SENSOR_PLPCDU_HEATSPREADER"] = tm_data[13]
|
||||||
|
parsed_data["content"]["SENSOR_TCS_BOARD"] = tm_data[14]
|
||||||
|
parsed_data["content"]["SENSOR_MAGNETTORQUER"] = tm_data[15]
|
||||||
|
|
||||||
|
#which in turn will become a json to be sent over the wire
|
||||||
|
json_string = json.dumps(parsed_data)
|
||||||
|
#print(json_string)
|
||||||
|
|
||||||
|
#try to send it to a tcp server
|
||||||
|
try:
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect((THERMAL_HOST, THERMAL_PORT))
|
||||||
|
s.sendall(bytes(json_string, encoding="utf-8"))
|
||||||
|
except:
|
||||||
|
#fail silently if there is noone listening, should be a non breaking feature
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
def handle_gps_data(printer: FsfwTmTcPrinter, hk_data: bytes):
|
||||||
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
|
LOGGER.info(f"Received GPS data, HK data length {len(hk_data)}")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user