Merge branch 'main' into ploc-supv-tm-handling
Some checks are pending
EIVE/-/pipeline/pr-main Build queued...

This commit is contained in:
Marius Eggert 2024-04-29 11:43:14 +02:00
commit 3b903e5639
7 changed files with 56 additions and 14 deletions

View File

@ -10,6 +10,14 @@ list yields a list of all related PRs for each release.
# [unreleased] # [unreleased]
## Fixed
- GNSS commands working again (again).
## Added
- Added handling for new clock events.
# [v6.2.0] 2024-04-10 # [v6.2.0] 2024-04-10
## Added ## Added

View File

@ -15,6 +15,10 @@ TM_DB_PATH = "tm.db"
# Separate DB or not? Not sure.. # Separate DB or not? Not sure..
# RAW_TM_PATH = "raw_tm.db" # RAW_TM_PATH = "raw_tm.db"
# TODO: The cleanest way would be to load those from the config file..
PRINT_RAW_HK_B64_STR = False
PRINT_RAW_EVENTS_B64_STR = False
PUS_APID = 0x65 PUS_APID = 0x65
CFDP_APID = 0x66 CFDP_APID = 0x66
PUS_PACKET_ID = PacketId(PacketType.TM, True, PUS_APID) PUS_PACKET_ID = PacketId(PacketType.TM, True, PUS_APID)

View File

@ -227,7 +227,7 @@ def handle_acs_procedure(queue_helper: DefaultPusQueueHelper, cmd_path_list: Lis
object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[2] object_id=RW4_ID, rw_idx=4, q=queue_helper, cmd_str=cmd_path_list[2]
) )
if cmd_path_list[0] == "gnss_devs": if cmd_path_list[0] == "gnss_ctrl":
return pack_gps_command( return pack_gps_command(
object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[1] object_id=oids.GPS_CONTROLLER, q=queue_helper, cmd_str=cmd_path_list[1]
) )

View File

@ -1,7 +1,9 @@
import logging import logging
import datetime import datetime
import sys import sys
import base64
from eive_tmtc.config.definitions import PRINT_RAW_EVENTS_B64_STR
from eive_tmtc.config.events import get_event_dict from eive_tmtc.config.events import get_event_dict
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
@ -21,6 +23,8 @@ _LOGGER = logging.getLogger(__name__)
def handle_event_packet( # noqa C901: Complexity okay here def handle_event_packet( # noqa C901: Complexity okay here
raw_tm: bytes, pw: PrintWrapper raw_tm: bytes, pw: PrintWrapper
): # noqa C901: Complexity okay here ): # noqa C901: Complexity okay here
if PRINT_RAW_EVENTS_B64_STR:
print(f"PUS Event TM Base64: {base64.b64encode(raw_tm)}")
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty()) tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
event_dict = get_event_dict() event_dict = get_event_dict()
event_def = tm.event_definition event_def = tm.event_definition
@ -36,6 +40,7 @@ def handle_event_packet( # noqa C901: Complexity okay here
obj_name = event_def.reporter_id.hex(sep=",") obj_name = event_def.reporter_id.hex(sep=",")
else: else:
obj_name = obj_id_obj.name obj_name = obj_id_obj.name
assert tm.time_provider is not None
generic_event_string = ( generic_event_string = (
f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})" f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})"
f" at {tm.time_provider.as_date_time()}" f" at {tm.time_provider.as_date_time()}"
@ -128,12 +133,16 @@ def handle_event_packet( # noqa C901: Complexity okay here
time = event_def.param1 + event_def.param2 / 1000.0 time = event_def.param1 + event_def.param2 / 1000.0
time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc)
pw.dlog(f"Current time: {time_dt}") pw.dlog(f"Current time: {time_dt}")
if info.name == "CLOCK_DUMP": if (
info.name == "CLOCK_DUMP"
or info.name == "CLOCK_DUMP_BEFORE_SETTING_TIME"
or info.name == "CLOCK_DUMP_AFTER_SETTING_TIME"
):
specific_handler = True specific_handler = True
# param 1 is timeval seconds, param 2 is timeval subsecond microseconds # param 1 is timeval seconds, param 2 is timeval subsecond microseconds
time = event_def.param1 + event_def.param2 / 1000000.0 time = event_def.param1 + event_def.param2 / 1000000.0
time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc)
pw.dlog(f"Current time: {time_dt}") pw.dlog(f"Clock dump event {info.name}. Current time: {time_dt}")
if info.name == "ACTIVE_SD_INFO": if info.name == "ACTIVE_SD_INFO":
sd_0_state = (event_def.param2 >> 16) & 0xFFFF sd_0_state = (event_def.param2 >> 16) & 0xFFFF
sd_1_state = event_def.param2 & 0xFFFF sd_1_state = event_def.param2 & 0xFFFF

View File

@ -6,6 +6,7 @@ import base64 # noqa
import sqlite3 import sqlite3
from typing import List, cast from typing import List, cast
from uuid import UUID from uuid import UUID
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
from eive_tmtc.pus_tm.hk import HkTmInfo from eive_tmtc.pus_tm.hk import HkTmInfo
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data
@ -74,7 +75,8 @@ def handle_hk_packet(
if tm_packet.subservice == 25 or tm_packet.subservice == 26: if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:] hk_data = tm_packet.tm_data[8:]
if named_obj_id.as_bytes in hk_filter.object_ids: if named_obj_id.as_bytes in hk_filter.object_ids:
# print(f"PUS TM Base64: {base64.b64encode(raw_tm)}") if PRINT_RAW_HK_B64_STR:
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
handle_regular_hk_print( handle_regular_hk_print(
printer=printer, printer=printer,
packet_uuid=packet_uuid, packet_uuid=packet_uuid,

View File

@ -23,7 +23,11 @@ class GpsInfo:
MAX_SATELLITES = 30 MAX_SATELLITES = 30
class GnssChip(enum.IntEnum): class ActIds:
RESET_GNSS = 5
class AcsBoardSides(enum.IntEnum):
A_SIDE = 0 A_SIDE = 0
B_SIDE = 1 B_SIDE = 1
@ -37,7 +41,7 @@ class OpCode:
REQ_SKYVIEW_HK = "skyview_hk_request" REQ_SKYVIEW_HK = "skyview_hk_request"
ENABLE_SKYVIEW_HK = "skyview_hk_enable" ENABLE_SKYVIEW_HK = "skyview_hk_enable"
DISABLE_SKYVIEW_HK = "skyview_hk_disable" DISABLE_SKYVIEW_HK = "skyview_hk_disable"
RESET_GNSS = "reset" RESET_GNSS = "reset_gnss"
class Info: class Info:
@ -63,7 +67,7 @@ def create_gnss_node() -> CmdTreeNode:
] ]
info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")] info_strs = [getattr(Info, key) for key in dir(OpCode) if not key.startswith("__")]
combined_dict = dict(zip(op_code_strs, info_strs)) combined_dict = dict(zip(op_code_strs, info_strs))
node = CmdTreeNode("gnss_devs", "GNSS Controller", hide_children_for_print=True) node = CmdTreeNode("gnss_ctrl", "GNSS Ctrl", hide_children_for_print=True)
for op_code, info in combined_dict.items(): for op_code, info in combined_dict.items():
node.add_child(CmdTreeNode(op_code, info)) node.add_child(CmdTreeNode(op_code, info))
return node return node
@ -73,16 +77,17 @@ def pack_gps_command( # noqa: C901
object_id: bytes, q: DefaultPusQueueHelper, cmd_str: str object_id: bytes, q: DefaultPusQueueHelper, cmd_str: str
): # noqa: C901: ): # noqa: C901:
if cmd_str == OpCode.RESET_GNSS: if cmd_str == OpCode.RESET_GNSS:
for val in GnssChip: for val in AcsBoardSides:
print("{:<2}: {:<20}".format(val, val.name)) print("{:<2}: {:<20}".format(val, val.name))
chip: str = "" board_side = int(input("Select Board Side \n" ""))
while chip not in ["0", "1"]: q.add_log_cmd(f"GPS: {Info.RESET_GNSS}")
chip = input("Please specify which chip to reset: ")
q.add_log_cmd(f"gps: {Info.DISABLE_CORE_HK}")
q.add_pus_tc( q.add_pus_tc(
create_action_cmd(object_id=object_id, action_id=5, user_data=chip.encode()) create_action_cmd(
object_id=object_id,
action_id=ActIds.RESET_GNSS,
user_data=bytearray([board_side]),
)
) )
_LOGGER.warning("Reset pin handling needs to be re-implemented")
if cmd_str == OpCode.ENABLE_CORE_HK: if cmd_str == OpCode.ENABLE_CORE_HK:
interval = float(input("Please specify interval in floating point seconds: ")) interval = float(input("Please specify interval in floating point seconds: "))
if interval <= 0: if interval <= 0:

14
scripts/raw-analysis.py Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
from base64 import b64decode
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss.tm import PusTelemetry
from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams
bruh = "CGX6cQAdIAEIOzcAAEBedwUTOzkYZe5WAAEAAAAAAAAAAF4z"
data = b64decode(bruh)
tm = PusTelemetry.unpack(data, CdsShortTimestamp.empty())
srv1_tm = Service1Tm.from_tm(
tm, UnpackParams(time_reader=CdsShortTimestamp.empty(), bytes_err_code=2)
)
print(f"service {tm.service} subservice {tm.subservice}")
print(f"error code: {srv1_tm.error_code}")