2024-04-09 11:18:54 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
import dataclasses
|
|
|
|
import enum
|
|
|
|
import struct
|
|
|
|
|
2024-04-10 15:39:53 +02:00
|
|
|
EXPERIMENT_ID = 278
|
|
|
|
EXPERIMENT_APID = 1024 + EXPERIMENT_ID
|
2024-04-09 11:18:54 +02:00
|
|
|
|
|
|
|
|
2024-04-22 15:47:25 +02:00
|
|
|
class UniqueId(enum.IntEnum):
|
|
|
|
Controller = 0
|
|
|
|
PusEventManagement = 1
|
|
|
|
PusRouting = 2
|
|
|
|
PusTest = 3
|
|
|
|
PusAction = 4
|
|
|
|
PusMode = 5
|
|
|
|
PusHk = 6
|
|
|
|
UdpServer = 7
|
|
|
|
TcpServer = 8
|
|
|
|
TcpSppClient = 9
|
2024-04-24 17:39:23 +02:00
|
|
|
PusScheduler = 10
|
|
|
|
CameraHandler = 11
|
2024-04-22 15:47:25 +02:00
|
|
|
|
|
|
|
|
2024-04-09 11:18:54 +02:00
|
|
|
class EventSeverity(enum.IntEnum):
|
|
|
|
INFO = 0
|
|
|
|
LOW = 1
|
|
|
|
MEDIUM = 2
|
|
|
|
HIGH = 3
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
class EventU32:
|
|
|
|
severity: EventSeverity
|
|
|
|
group_id: int
|
|
|
|
unique_id: int
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def unpack(cls, data: bytes) -> EventU32:
|
|
|
|
if len(data) < 4:
|
|
|
|
raise ValueError("passed data too short")
|
|
|
|
event_raw = struct.unpack("!I", data[0:4])[0]
|
|
|
|
return cls(
|
|
|
|
severity=EventSeverity((event_raw >> 30) & 0b11),
|
|
|
|
group_id=(event_raw >> 16) & 0x3FFF,
|
|
|
|
unique_id=event_raw & 0xFFFF,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class AcsId(enum.IntEnum):
|
|
|
|
MGM_0 = 0
|
|
|
|
|
|
|
|
|
|
|
|
class AcsHkIds(enum.IntEnum):
|
|
|
|
MGM_SET = 1
|
|
|
|
|
|
|
|
|
2024-04-24 20:15:32 +02:00
|
|
|
def make_unique_id(unique_id: int) -> bytes:
|
|
|
|
return struct.pack("!I", unique_id)
|
2024-04-24 17:39:23 +02:00
|
|
|
|
|
|
|
|
2024-04-24 20:15:32 +02:00
|
|
|
def make_action_cmd_header(unique_id: int, action_id: int) -> bytes:
|
2024-04-22 15:47:25 +02:00
|
|
|
byte_string = bytearray(struct.pack("!I", unique_id))
|
|
|
|
byte_string.extend(struct.pack("!I", action_id))
|
2024-04-09 11:18:54 +02:00
|
|
|
return byte_string
|