that's a lot of work..
EIVE/-/pipeline/pr-main This commit looks good Details

This commit is contained in:
Robin Müller 2023-11-13 13:48:08 +01:00
parent 2259d269dd
commit c0ab3351c9
Signed by: muellerr
GPG Key ID: A649FB78196E3849
1 changed files with 101 additions and 50 deletions

View File

@ -1,6 +1,9 @@
import dataclasses
from datetime import datetime
import struct import struct
import logging
import sqlite3 import sqlite3
from typing import List, Optional, Tuple from typing import List, Tuple
from uuid import UUID from uuid import UUID
from eive_tmtc.tmtc.power.acu import acu_config_table_handler from eive_tmtc.tmtc.power.acu import acu_config_table_handler
@ -22,6 +25,8 @@ from eive_tmtc.config.object_ids import (
ACU_HANDLER_ID, ACU_HANDLER_ID,
) )
_LOGGER = logging.getLogger(__name__)
P60_INDEX_LIST = [ P60_INDEX_LIST = [
"ACU VCC", "ACU VCC",
"PDU1 VCC", "PDU1 VCC",
@ -149,11 +154,23 @@ class DevicesInfoParser:
return "Unknown Type" return "Unknown Type"
@dataclasses.dataclass
class PduData:
boot_count: int
batt_mode: int
temperature: float
vcc: int
vbat: int
out_enables: List[bool]
voltages: List[int]
currents: List[int]
def handle_pdu_data( def handle_pdu_data(
hk_data: bytes, hk_data: bytes,
hk_packet: Service3FsfwTm, hk_packet: Service3FsfwTm,
packet_uuid: UUID, packet_uuid: UUID,
con: Optional[sqlite3.Connection], con: sqlite3.Connection,
pw: PrintWrapper, pw: PrintWrapper,
pdu_idx: int, pdu_idx: int,
set_id: int, set_id: int,
@ -230,58 +247,92 @@ def handle_pdu_data(
f"Boot Count {boot_count} | Battery Mode {batt_mode} | " f"Boot Count {boot_count} | Battery Mode {batt_mode} | "
f"Temperature {temperature} | VCC {vcc} | VBAT {vbat}" f"Temperature {temperature} | VCC {vcc} | VBAT {vbat}"
) )
if con is not None: try:
packet_dt = hk_packet.pus_tm.time_provider.as_datetime() # type: ignore handle_pdu_db_insertion(
cursor = con.cursor() con,
if pdu_idx == 1: packet_uuid,
tbl_base_name = "Pdu1_" hk_packet.pus_tm.time_provider.as_datetime(), # type: ignore
channel_list = PDU1_CHANNELS_NAMES pdu_idx,
else: PduData(
tbl_base_name = "Pdu2_" boot_count,
channel_list = PDU2_CHANNELS_NAMES batt_mode,
cursor.execute( temperature,
f""" vcc,
CREATE TABLE IF NOT EXISTS {tbl_base_name}BootCount( vbat,
packet_uuid TEXT PRIMARY KEY, output_enb_list,
generation_time TEXT, voltage_list,
Count NUM current_list,
)""" ),
) )
cursor.execute( except sqlite3.OperationalError as e:
f"INSERT INTO {tbl_base_name}BootCount VALUES(?, ?, ?)", _LOGGER.warning(f"SQLite error {e}")
(str(packet_uuid), packet_dt, boot_count),
)
for idx, name in enumerate(channel_list):
words = name.split()
camel_case_name = "".join(word.capitalize() for word in words)
tbl_name = f"{tbl_base_name}{camel_case_name}"
print(f"creating table {tbl_name}")
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {tbl_name}(
packet_uuid TEXT PRIMARY KEY,
generation_time TEXT,
out_enable NUM,
voltage NUM,
current NUM
)"""
)
value_tuple = (
str(packet_uuid),
packet_dt,
output_enb_list[idx],
voltage_list[idx],
current_list[idx],
)
print(value_tuple)
cursor.execute(
f"INSERT INTO {tbl_name} VALUES(?, ?, ?, ?, ?)", value_tuple
)
con.commit()
pw.dlog(info) pw.dlog(info)
def handle_pdu_db_insertion(
con: sqlite3.Connection,
packet_uuid: UUID,
packet_dt: datetime,
pdu_idx: int,
pdu_data: PduData,
):
cursor = con.cursor()
if pdu_idx == 1:
tbl_base_name = "pdu1"
channel_list = PDU1_CHANNELS_NAMES
else:
tbl_base_name = "pdu2"
channel_list = PDU2_CHANNELS_NAMES
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {tbl_base_name}(
packet_uuid TEXT PRIMARY KEY,
generation_time TEXT,
boot_count NUM,
bat_mode NUM,
temp REAL,
vcc NUM,
vbat NUM
)"""
)
cursor.execute(
f"INSERT INTO {tbl_base_name} VALUES(?, ?, ?, ?, ?, ?, ?)",
(
str(packet_uuid),
packet_dt,
pdu_data.boot_count,
pdu_data.batt_mode,
pdu_data.temperature,
pdu_data.vcc,
pdu_data.vbat,
),
)
for idx, name in enumerate(channel_list):
words = name.split()
camel_case_name = "_".join(word.lower() for word in words)
tbl_name = f"{tbl_base_name}_{camel_case_name}"
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {tbl_name}(
packet_uuid TEXT PRIMARY KEY,
generation_time TEXT,
out_enable NUM,
voltage NUM,
current NUM
)"""
)
value_tuple = (
str(packet_uuid),
packet_dt,
pdu_data.out_enables[idx],
pdu_data.voltages[idx],
pdu_data.currents[idx],
)
cursor.execute(f"INSERT INTO {tbl_name} VALUES(?, ?, ?, ?, ?)", value_tuple)
con.commit()
def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes): def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
if set_id == SetId.CORE: if set_id == SetId.CORE:
pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA") pw.dlog("Received P60 Core HK. Voltages in mV, currents in mA")