Compare commits

..

10 Commits

8 changed files with 46 additions and 11 deletions

@@ -10,6 +10,10 @@ list yields a list of all related PRs for each release.
# [unreleased] # [unreleased]
## Added
- Added handling for new clock events.
# [v6.2.0] 2024-04-10 # [v6.2.0] 2024-04-10
## Added ## Added

@@ -15,6 +15,10 @@ TM_DB_PATH = "tm.db"
# Separate DB or not? Not sure.. # Separate DB or not? Not sure..
# RAW_TM_PATH = "raw_tm.db" # RAW_TM_PATH = "raw_tm.db"
# TODO: The cleanest way would be to load those from the config file..
PRINT_RAW_HK_B64_STR = False
PRINT_RAW_EVENTS_B64_STR = False
PUS_APID = 0x65 PUS_APID = 0x65
CFDP_APID = 0x66 CFDP_APID = 0x66
PUS_PACKET_ID = PacketId(PacketType.TM, True, PUS_APID) PUS_PACKET_ID = PacketId(PacketType.TM, True, PUS_APID)

@@ -4,7 +4,7 @@
import logging import logging
from typing import List, cast from typing import List, cast
from tmtccmd import DefaultProcedureInfo from tmtccmd import TreeCommandingProcedure
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@@ -78,7 +78,7 @@ from eive_tmtc.utility.input_helper import InputHelper
def handle_pus_procedure( def handle_pus_procedure(
info: DefaultProcedureInfo, info: TreeCommandingProcedure,
queue_helper: DefaultPusQueueHelper, queue_helper: DefaultPusQueueHelper,
): ):
cmd_path = info.cmd_path cmd_path = info.cmd_path

@@ -1,7 +1,9 @@
import logging import logging
import datetime import datetime
import sys import sys
import base64
from eive_tmtc.config.definitions import PRINT_RAW_EVENTS_B64_STR
from eive_tmtc.config.events import get_event_dict from eive_tmtc.config.events import get_event_dict
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
@@ -21,6 +23,8 @@ _LOGGER = logging.getLogger(__name__)
def handle_event_packet( # noqa C901: Complexity okay here def handle_event_packet( # noqa C901: Complexity okay here
raw_tm: bytes, pw: PrintWrapper raw_tm: bytes, pw: PrintWrapper
): # noqa C901: Complexity okay here ): # noqa C901: Complexity okay here
if PRINT_RAW_EVENTS_B64_STR:
print(f"PUS Event TM Base64: {base64.b64encode(raw_tm)}")
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty()) tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
event_dict = get_event_dict() event_dict = get_event_dict()
event_def = tm.event_definition event_def = tm.event_definition
@@ -36,6 +40,7 @@ def handle_event_packet( # noqa C901: Complexity okay here
obj_name = event_def.reporter_id.hex(sep=",") obj_name = event_def.reporter_id.hex(sep=",")
else: else:
obj_name = obj_id_obj.name obj_name = obj_id_obj.name
assert tm.time_provider is not None
generic_event_string = ( generic_event_string = (
f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})" f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})"
f" at {tm.time_provider.as_date_time()}" f" at {tm.time_provider.as_date_time()}"
@@ -128,12 +133,16 @@ def handle_event_packet( # noqa C901: Complexity okay here
time = event_def.param1 + event_def.param2 / 1000.0 time = event_def.param1 + event_def.param2 / 1000.0
time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc)
pw.dlog(f"Current time: {time_dt}") pw.dlog(f"Current time: {time_dt}")
if info.name == "CLOCK_DUMP": if (
info.name == "CLOCK_DUMP"
or info.name == "CLOCK_DUMP_BEFORE_SETTING_TIME"
or info.name == "CLOCK_DUMP_AFTER_SETTING_TIME"
):
specific_handler = True specific_handler = True
# param 1 is timeval seconds, param 2 is timeval subsecond microseconds # param 1 is timeval seconds, param 2 is timeval subsecond microseconds
time = event_def.param1 + event_def.param2 / 1000000.0 time = event_def.param1 + event_def.param2 / 1000000.0
time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc) time_dt = datetime.datetime.fromtimestamp(time, datetime.timezone.utc)
pw.dlog(f"Current time: {time_dt}") pw.dlog(f"Clock dump event {info.name}. Current time: {time_dt}")
if info.name == "ACTIVE_SD_INFO": if info.name == "ACTIVE_SD_INFO":
sd_0_state = (event_def.param2 >> 16) & 0xFFFF sd_0_state = (event_def.param2 >> 16) & 0xFFFF
sd_1_state = event_def.param2 & 0xFFFF sd_1_state = event_def.param2 & 0xFFFF

@@ -6,6 +6,7 @@ import base64 # noqa
import sqlite3 import sqlite3
from typing import List, cast from typing import List, cast
from uuid import UUID from uuid import UUID
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
from eive_tmtc.pus_tm.hk import HkTmInfo from eive_tmtc.pus_tm.hk import HkTmInfo
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_hk_data
@@ -74,7 +75,8 @@ def handle_hk_packet(
if tm_packet.subservice == 25 or tm_packet.subservice == 26: if tm_packet.subservice == 25 or tm_packet.subservice == 26:
hk_data = tm_packet.tm_data[8:] hk_data = tm_packet.tm_data[8:]
if named_obj_id.as_bytes in hk_filter.object_ids: if named_obj_id.as_bytes in hk_filter.object_ids:
# print(f"PUS TM Base64: {base64.b64encode(raw_tm)}") if PRINT_RAW_HK_B64_STR:
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
handle_regular_hk_print( handle_regular_hk_print(
printer=printer, printer=printer,
packet_uuid=packet_uuid, packet_uuid=packet_uuid,

@@ -29,10 +29,10 @@ classifiers = [
"Topic :: Scientific/Engineering" "Topic :: Scientific/Engineering"
] ]
dependencies = [ dependencies = [
"tmtccmd ~= 8.0.0rc1", # "tmtccmd~=8.0.0rc2",
"cfdp-py~=0.1.0", "cfdp-py~=0.1.1",
# "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main", "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main",
"python-dateutil ~= 2.8", "python-dateutil ~= 2.8",
] ]
[project.urls] [project.urls]

14
scripts/raw-analysis.py Executable file

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
from base64 import b64decode
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss.tm import PusTelemetry
from spacepackets.ecss.pus_1_verification import Service1Tm, UnpackParams
bruh = "CGX6cQAdIAEIOzcAAEBedwUTOzkYZe5WAAEAAAAAAAAAAF4z"
data = b64decode(bruh)
tm = PusTelemetry.unpack(data, CdsShortTimestamp.empty())
srv1_tm = Service1Tm.from_tm(
tm, UnpackParams(time_reader=CdsShortTimestamp.empty(), bytes_err_code=2)
)
print(f"service {tm.service} subservice {tm.subservice}")
print(f"error code: {srv1_tm.error_code}")

@@ -97,8 +97,8 @@ def setup_params() -> Tuple[SetupWrapper, int]:
hk_level = int(post_arg_parsing_wrapper.args_raw.hk) hk_level = int(post_arg_parsing_wrapper.args_raw.hk)
else: else:
hk_level = 0 hk_level = 0
if params.app_params.print_tree: if params.tc_params.print_tree:
perform_tree_printout(params.app_params, hook_obj.get_command_definitions()) perform_tree_printout(params.tc_params, hook_obj.get_command_definitions())
sys.exit(0) sys.exit(0)
params.apid = PUS_APID params.apid = PUS_APID
if params.com_if is None: if params.com_if is None:
@@ -232,6 +232,7 @@ def main(): # noqa C901: Complexity okay here.
state = tmtc_backend.periodic_op(None) state = tmtc_backend.periodic_op(None)
tc_handler.cfdp_in_ccsds_handler.state_machine() tc_handler.cfdp_in_ccsds_handler.state_machine()
if state.request == BackendRequest.TERMINATION_NO_ERROR: if state.request == BackendRequest.TERMINATION_NO_ERROR:
tmtc_backend.close_com_if()
sys.exit(0) sys.exit(0)
elif state.request == BackendRequest.DELAY_IDLE: elif state.request == BackendRequest.DELAY_IDLE:
_LOGGER.info("TMTC Client in IDLE mode") _LOGGER.info("TMTC Client in IDLE mode")
@@ -252,6 +253,7 @@ def main(): # noqa C901: Complexity okay here.
elif state.request == BackendRequest.CALL_NEXT: elif state.request == BackendRequest.CALL_NEXT:
pass pass
except KeyboardInterrupt: except KeyboardInterrupt:
tmtc_backend.close_com_if()
sys.exit(0) sys.exit(0)