try to make MGM set HK data work
All checks were successful
Rust/sat-rs/pipeline/pr-main This commit looks good

This commit is contained in:
2024-05-10 17:55:11 +02:00
parent 9e096193dd
commit 37b32a9008
6 changed files with 95 additions and 26 deletions

View File

@ -1,15 +1,42 @@
import logging
import struct
from spacepackets.ecss.pus_3_hk import Subservice
from spacepackets.ecss import PusService, PusTc
from spacepackets.ecss import PusTm
from pytmtc.common import AcsId, Apid
from pytmtc.mgms import handle_mgm_hk_report
def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc:
app_data = bytearray()
app_data.extend(struct.pack("!I", unique_id))
app_data.extend(struct.pack("!I", set_id))
return PusTc(
service=PusService.S3_HOUSEKEEPING,
subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT,
apid=apid,
app_data=app_data,
_LOGGER = logging.getLogger(__name__)
def handle_hk_packet(pus_tm: PusTm):
if len(pus_tm.source_data) < 4:
raise ValueError("no unique ID in HK packet")
unique_id = struct.unpack("!I", pus_tm.source_data[:4])[0]
if (
pus_tm.subservice == Subservice.TM_HK_REPORT
or pus_tm.subservice == Subservice.TM_DIAGNOSTICS_REPORT
):
if len(pus_tm.source_data) < 8:
raise ValueError("no set ID in HK packet")
set_id = struct.unpack("!I", pus_tm.source_data[4:8])[0]
handle_hk_report(pus_tm, unique_id, set_id)
_LOGGER.warning(
f"handling for HK packet with subservice {pus_tm.subservice} not implemented yet"
)
def handle_hk_report(pus_tm: PusTm, unique_id: int, set_id: int):
hk_data = pus_tm.source_data[8:]
if pus_tm.apid == Apid.ACS:
if unique_id == AcsId.MGM_0:
handle_mgm_hk_report(pus_tm, set_id, hk_data)
else:
_LOGGER.warning(
f"handling for HK report with unique ID {unique_id} not implemented yet"
)
else:
_LOGGER.warning(
f"handling for HK report with apid {pus_tm.apid} not implemented yet"
)

View File

@ -0,0 +1,16 @@
import struct
from spacepackets.ecss import PusService, PusTc
from spacepackets.ecss.pus_3_hk import Subservice
def create_request_one_shot_hk_cmd(apid: int, unique_id: int, set_id: int) -> PusTc:
app_data = bytearray()
app_data.extend(struct.pack("!I", unique_id))
app_data.extend(struct.pack("!I", set_id))
return PusTc(
service=PusService.S3_HOUSEKEEPING,
subservice=Subservice.TC_GENERATE_ONE_PARAMETER_REPORT,
apid=apid,
app_data=app_data,
)

View File

@ -1,12 +1,18 @@
import logging
import struct
import enum
from typing import List
from spacepackets.ecss import PusTm
from tmtccmd.tmtc import DefaultPusQueueHelper
from pytmtc.common import AcsId, Apid
from pytmtc.hk import create_request_one_shot_hk_cmd
from pytmtc.hk_common import create_request_one_shot_hk_cmd
from pytmtc.mode import handle_set_mode_cmd
_LOGGER = logging.getLogger(__name__)
class SetId(enum.IntEnum):
SENSOR_SET = 0
@ -23,3 +29,17 @@ def create_mgm_cmds(q: DefaultPusQueueHelper, cmd_path: List[str]):
if cmd_path[2] == "mode":
if cmd_path[3] == "set_mode":
handle_set_mode_cmd(q, "MGM 0", cmd_path[4], Apid.ACS, AcsId.MGM_0)
def handle_mgm_hk_report(pus_tm: PusTm, set_id: int, hk_data: bytes):
if set_id == SetId.SENSOR_SET:
if len(hk_data) != 13:
raise ValueError(f"invalid HK data length, expected 13, got {len(hk_data)}")
data_valid = hk_data[0]
mgm_x = struct.unpack("!f", hk_data[1:5])[0]
mgm_y = struct.unpack("!f", hk_data[5:9])[0]
mgm_z = struct.unpack("!f", hk_data[9:13])[0]
_LOGGER.info(
f"received MGM HK set in uT: Valid {data_valid} X {mgm_x} Y {mgm_y} Z {mgm_z}"
)
pass

View File

@ -9,6 +9,7 @@ from tmtccmd.pus import VerificationWrapper
from tmtccmd.tmtc import GenericApidHandlerBase
from pytmtc.common import Apid, EventU32
from pytmtc.hk import handle_hk_packet
_LOGGER = logging.getLogger(__name__)
@ -53,16 +54,10 @@ class PusHandler(GenericApidHandlerBase):
self.verif_wrapper.log_to_console(tm_packet, res)
self.verif_wrapper.log_to_file(tm_packet, res)
elif service == 3:
_LOGGER.info("No handling for HK packets implemented")
_LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
json_str = pus_tm.source_data[8:]
_LOGGER.info(json_str)
handle_hk_packet(pus_tm)
elif service == 5:
tm_packet = PusTm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE