Compare commits

...

18 Commits
v7.0.0 ... main

Author SHA1 Message Date
6e85b1add8 Merge pull request 'prep next release' (#302) from prep-v7.2.0 into main
Reviewed-on: #302
2025-01-17 11:25:20 +01:00
73a7c6ec14
Merge remote-tracking branch 'origin/main' into prep-v7.2.0 2025-01-17 11:22:29 +01:00
28161059ca Merge pull request 'Mode handling fix' (#300) from mode-handling-fix into main
Reviewed-on: #300
Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
2025-01-17 11:22:01 +01:00
52d434630b
prep next release 2025-01-17 11:09:23 +01:00
55a357b52c Merge branch 'main' into mode-handling-fix
All checks were successful
EIVE/-/pipeline/pr-main This commit looks good
2024-05-28 09:40:46 +02:00
2d6e9f826c Merge pull request 'action reply update' (#299) from small-action-reply-update into main
All checks were successful
EIVE/-/pipeline/head This commit looks good
Reviewed-on: #299
Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
2024-05-28 09:40:21 +02:00
20489a02d2
fix for future versions of tmtccmd
All checks were successful
EIVE/-/pipeline/pr-main This commit looks good
2024-05-23 19:00:51 +02:00
652f3bf66f
changelog
All checks were successful
EIVE/-/pipeline/pr-main This commit looks good
2024-05-23 17:40:55 +02:00
8419a4edd7
some more fixes
All checks were successful
EIVE/-/pipeline/head This commit looks good
2024-05-23 14:45:10 +02:00
673c8b2cf2
mode handling fix
All checks were successful
EIVE/-/pipeline/head This commit looks good
2024-05-23 13:42:32 +02:00
e23b39e652
action reply update
All checks were successful
EIVE/-/pipeline/head This commit looks good
EIVE/-/pipeline/pr-main This commit looks good
2024-05-16 11:03:55 +02:00
9a55686a26 Merge pull request 'Some more fixes' (#298) from some-more-fixes-for-tmtccmd-bump into main
All checks were successful
EIVE/-/pipeline/head This commit looks good
Reviewed-on: #298
Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
2024-05-13 14:06:00 +02:00
067a016040
another small fix
All checks were successful
EIVE/-/pipeline/head This commit looks good
EIVE/-/pipeline/pr-main This commit looks good
2024-05-08 10:56:16 +02:00
dccbf89da1
some more fixes
All checks were successful
EIVE/-/pipeline/head This commit looks good
2024-05-08 10:49:20 +02:00
5cf76c07e9 Merge pull request 'Bump tmtccmd and fix CFDP' (#297) from bump-tmtccmd-fix-cfdp into main
All checks were successful
EIVE/-/pipeline/head This commit looks good
Reviewed-on: #297
Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
2024-05-07 14:17:40 +02:00
bc53f7e81a Merge remote-tracking branch 'origin/main' into bump-tmtccmd-fix-cfdp
Some checks are pending
EIVE/-/pipeline/head Build started...
2024-05-07 14:10:05 +02:00
d6b879da67 bump tmtccmd and fix CFDP code
Some checks are pending
EIVE/-/pipeline/head Build started...
2024-05-07 14:07:18 +02:00
86a68e25f7 need to fix action reply handler
All checks were successful
EIVE/-/pipeline/head This commit looks good
2024-05-06 11:23:31 +02:00
37 changed files with 257 additions and 235 deletions

View File

@ -10,6 +10,14 @@ list yields a list of all related PRs for each release.
# [unreleased] # [unreleased]
# [v7.1.0] 2025-01-17
- Bumped `tmtccmd` to v8.1.0
## Fixed
- Use new mode TM API.
# [v7.0.0] 2024-05-06 # [v7.0.0] 2024-05-06
- Reworked PLOC MPSoC commanding to be inline with OBSW update. - Reworked PLOC MPSoC commanding to be inline with OBSW update.

View File

@ -62,13 +62,7 @@ class CfdpHandler:
) )
def put_request(self, request: PutRequest): def put_request(self, request: PutRequest):
if not self.remote_cfg_table.get_cfg(request.destination_id): self.source_handler.put_request(request)
raise ValueError(
f"No remote CFDP config found for entity ID {request.destination_id}"
)
self.source_handler.put_request(
request, self.remote_cfg_table.get_cfg(request.destination_id) # type: ignore
)
def pull_next_source_packet(self) -> Optional[PduHolder]: def pull_next_source_packet(self) -> Optional[PduHolder]:
res = self.source_handler.state_machine() res = self.source_handler.state_machine()

View File

@ -17,6 +17,7 @@ TM_DB_PATH = "tm.db"
# TODO: The cleanest way would be to load those from the config file.. # TODO: The cleanest way would be to load those from the config file..
PRINT_RAW_HK_B64_STR = False PRINT_RAW_HK_B64_STR = False
PRINT_RAW_ACTION_DATA_REPLY_B64_STR = True
PRINT_RAW_EVENTS_B64_STR = False PRINT_RAW_EVENTS_B64_STR = False
PUS_APID = 0x65 PUS_APID = 0x65

View File

@ -52,7 +52,7 @@ from eive_tmtc.tmtc.wdt import create_wdt_node
class EiveHookObject(HookBase): class EiveHookObject(HookBase):
def __init__(self, json_cfg_path: str): def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path) super().__init__(json_cfg_path)
def get_command_definitions(self) -> CmdTreeNode: def get_command_definitions(self) -> CmdTreeNode:
root_node = CmdTreeNode.root_node() root_node = CmdTreeNode.root_node()

View File

@ -4,7 +4,7 @@
import logging import logging
from typing import List, cast from typing import List, cast
from tmtccmd import DefaultProcedureInfo from tmtccmd import TreeCommandingProcedure
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
@ -78,7 +78,7 @@ from eive_tmtc.utility.input_helper import InputHelper
def handle_pus_procedure( def handle_pus_procedure(
info: DefaultProcedureInfo, info: TreeCommandingProcedure,
queue_helper: DefaultPusQueueHelper, queue_helper: DefaultPusQueueHelper,
): ):
cmd_path = info.cmd_path cmd_path = info.cmd_path

View File

@ -68,8 +68,8 @@ class TcHandler(TcHandlerBase):
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper): def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT: if info.proc_type == TcProcedureType.TREE_COMMANDING:
handle_pus_procedure(info.to_def_procedure(), self.queue_helper) handle_pus_procedure(info.to_tree_commanding_procedure(), self.queue_helper)
elif info.proc_type == TcProcedureType.CFDP: elif info.proc_type == TcProcedureType.CFDP:
self.handle_cfdp_procedure(info) self.handle_cfdp_procedure(info)
@ -155,7 +155,7 @@ class TcHandler(TcHandlerBase):
def queue_finished_cb(self, info: ProcedureWrapper): def queue_finished_cb(self, info: ProcedureWrapper):
if info is not None: if info is not None:
if info.proc_type == TcQueueEntryType.PUS_TC: if info.proc_type == TcQueueEntryType.PUS_TC:
def_proc = info.to_def_procedure() def_proc = info.to_tree_commanding_procedure()
_LOGGER.info(f"Finished queue for command {def_proc.cmd_path}") _LOGGER.info(f"Finished queue for command {def_proc.cmd_path}")
elif info.proc_type == TcProcedureType.CFDP: elif info.proc_type == TcProcedureType.CFDP:
_LOGGER.info("Finished CFDP queue") _LOGGER.info("Finished CFDP queue")

View File

@ -1,5 +1,10 @@
import base64
import logging import logging
import struct import struct
from spacepackets.ecss import PusTm
from tmtccmd.pus.s8_fsfw_action_defs import CustomSubservice
from eive_tmtc.config.definitions import PRINT_RAW_ACTION_DATA_REPLY_B64_STR
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
ACU_HANDLER_ID, ACU_HANDLER_ID,
PDU_1_HANDLER_ID, PDU_1_HANDLER_ID,
@ -20,56 +25,76 @@ from eive_tmtc.tmtc.payload.ploc_supervisor import SupvActionId
from eive_tmtc.tmtc.acs.star_tracker import handle_star_tracker_action_replies from eive_tmtc.tmtc.acs.star_tracker import handle_star_tracker_action_replies
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_action_replies from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_action_replies
from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply
from tmtccmd.pus.s8_fsfw_action import Service8FsfwTm
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd.util import ObjectIdDictT from tmtccmd.util import ObjectIdBase, ObjectIdDictT, ObjectIdU32
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def handle_action_reply( _LOGGER = logging.getLogger(__name__)
raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
def handle_action_service_tm(
raw_tm: bytes, pw: PrintWrapper, obj_id_dict: ObjectIdDictT
): ):
"""Core Action reply handler """Core Action reply handler
:return: :return:
""" """
tm_packet = Service8FsfwTm.unpack( tm_packet = PusTm.unpack(raw_tm, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE)
raw_telemetry=raw_tm, time_reader=CdsShortTimestamp.empty() if len(tm_packet.source_data) < 8:
) _LOGGER.warning(
object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes) "received action service reply with source data smaller than 8 bytes"
pw = PrintWrapper(printer.file_logger) )
custom_data = tm_packet.custom_data return
action_id = tm_packet.action_id object_id_raw = struct.unpack("!I", tm_packet.source_data[0:4])[0]
generic_print_str = printer.generic_action_packet_tm_print( action_id = struct.unpack("!I", tm_packet.source_data[4:8])[0]
packet=tm_packet, obj_id=object_id object_id = obj_id_dict.get(object_id_raw)
) custom_data = tm_packet.source_data[8:]
pw.dlog(generic_print_str) if object_id is None:
if object_id.as_bytes == IMTQ_HANDLER_ID: object_id = ObjectIdU32(object_id_raw, "Unknown ID")
return handle_imtq_replies(action_id, pw, custom_data) if tm_packet.subservice == CustomSubservice.TM_DATA_REPLY:
elif object_id.as_bytes == PLOC_MPSOC_ID: if PRINT_RAW_ACTION_DATA_REPLY_B64_STR:
return handle_mpsoc_data_reply(action_id, pw, custom_data) print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
elif object_id.as_bytes == PLOC_SUPV_ID: if object_id.as_bytes == IMTQ_HANDLER_ID:
return handle_supervisor_replies(action_id, pw, custom_data) return handle_imtq_replies(action_id, pw, custom_data)
elif object_id.as_bytes == CORE_CONTROLLER_ID: elif object_id.as_bytes == PLOC_MPSOC_ID:
return handle_core_ctrl_action_replies(action_id, pw, custom_data) return handle_mpsoc_data_reply(action_id, pw, custom_data)
elif object_id.as_bytes == STAR_TRACKER_ID: elif object_id.as_bytes == PLOC_SUPV_ID:
return handle_star_tracker_action_replies(action_id, pw, custom_data) return handle_supervisor_replies(action_id, pw, custom_data)
elif object_id.as_bytes == ACS_CONTROLLER: elif object_id.as_bytes == CORE_CONTROLLER_ID:
return handle_acs_ctrl_action_replies(action_id, pw, custom_data) return handle_core_ctrl_action_replies(action_id, pw, custom_data)
elif object_id.as_bytes in [ elif object_id.as_bytes == STAR_TRACKER_ID:
ACU_HANDLER_ID, return handle_star_tracker_action_replies(action_id, pw, custom_data)
PDU_1_HANDLER_ID, elif object_id.as_bytes == ACS_CONTROLLER:
PDU_2_HANDLER_ID, return handle_acs_ctrl_action_replies(action_id, pw, custom_data)
P60_DOCK_HANDLER, elif object_id.as_bytes in [
]: ACU_HANDLER_ID,
return handle_get_param_data_reply(object_id, action_id, pw, custom_data) PDU_1_HANDLER_ID,
PDU_2_HANDLER_ID,
P60_DOCK_HANDLER,
]:
return handle_get_param_data_reply(object_id, action_id, pw, custom_data)
else:
# TODO: Could add a handler here depending on action ID and object ID.
handle_action_data_reply(tm_packet, object_id, action_id, pw)
else: else:
pw.dlog(f"No dedicated action reply handler found for reply from {object_id}") pw.dlog(
pw.dlog(f"Raw Data: {tm_packet.custom_data.hex(sep=',')}") f"service 8 packet from {object_id} with action ID {action_id} "
f"and unknown subservice {tm_packet.subservice}"
)
def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray): def handle_action_data_reply(
tm_packet: PusTm, named_obj_id: ObjectIdBase, action_id: int, printer: PrintWrapper
):
print_string = (
f"service 8 data reply from {named_obj_id} with action ID {action_id} "
f"and data size {len(tm_packet.tm_data[8:])}"
)
printer.dlog(print_string)
def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytes):
if action_id == struct.unpack("!I", ImtqActionId.get_commanded_dipole)[0]: if action_id == struct.unpack("!I", ImtqActionId.get_commanded_dipole)[0]:
header_list = [ header_list = [
"Commanded X-Dipole", "Commanded X-Dipole",
@ -82,7 +107,7 @@ def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray
pw.dlog(f"{content_list}") pw.dlog(f"{content_list}")
def handle_supervisor_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray): def handle_supervisor_replies(action_id: int, pw: PrintWrapper, custom_data: bytes):
if action_id == SupvActionId.DUMP_MRAM: if action_id == SupvActionId.DUMP_MRAM:
header_list = ["MRAM Dump"] header_list = ["MRAM Dump"]
content_list = [custom_data[: len(custom_data)]] content_list = [custom_data[: len(custom_data)]]

View File

@ -25,7 +25,7 @@ def handle_event_packet( # noqa C901: Complexity okay here
): # noqa C901: Complexity okay here ): # noqa C901: Complexity okay here
if PRINT_RAW_EVENTS_B64_STR: if PRINT_RAW_EVENTS_B64_STR:
print(f"PUS Event TM Base64: {base64.b64encode(raw_tm)}") print(f"PUS Event TM Base64: {base64.b64encode(raw_tm)}")
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty()) tm = Service5Tm.unpack(data=raw_tm, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE)
event_dict = get_event_dict() event_dict = get_event_dict()
event_def = tm.event_definition event_def = tm.event_definition
info = event_dict.get(event_def.event_id) info = event_dict.get(event_def.event_id)
@ -40,10 +40,10 @@ def handle_event_packet( # noqa C901: Complexity okay here
obj_name = event_def.reporter_id.hex(sep=",") obj_name = event_def.reporter_id.hex(sep=",")
else: else:
obj_name = obj_id_obj.name obj_name = obj_id_obj.name
assert tm.time_provider is not None timestamp = CdsShortTimestamp.unpack(tm.timestamp)
generic_event_string = ( generic_event_string = (
f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})" f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})"
f" at {tm.time_provider.as_date_time()}" f" at {timestamp.as_date_time()}"
) )
_LOGGER.info(generic_event_string) _LOGGER.info(generic_event_string)
pw.file_logger.info( pw.file_logger.info(

View File

@ -2,20 +2,17 @@ import uuid
import dataclasses import dataclasses
import datetime import datetime
import sqlite3 import sqlite3
from tmtccmd.pus.tm.s3_fsfw_hk import Service3FsfwTm from spacepackets.ecss.tm import CdsShortTimestamp, PusTm
@dataclasses.dataclass @dataclasses.dataclass
class HkTmInfo: class HkTmInfo:
packet_uuid: uuid.UUID packet_uuid: uuid.UUID
hk_packet: Service3FsfwTm hk_packet: PusTm
set_id: int
db_con: sqlite3.Connection db_con: sqlite3.Connection
hk_data: bytes hk_data: bytes
@property @property
def packet_datetime(self) -> datetime.datetime: def packet_datetime(self) -> datetime.datetime:
return self.hk_packet.pus_tm.time_provider.as_datetime() return CdsShortTimestamp.unpack(self.hk_packet.timestamp).as_datetime()
@property
def set_id(self) -> int:
return self.hk_packet.set_id

View File

@ -2,10 +2,14 @@
import dataclasses import dataclasses
import logging import logging
import base64 # noqa import base64
import sqlite3 import sqlite3
import struct
from typing import List, cast from typing import List, cast
from uuid import UUID from uuid import UUID
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss import PusTm
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
from eive_tmtc.pus_tm.hk import HkTmInfo from eive_tmtc.pus_tm.hk import HkTmInfo
@ -23,9 +27,6 @@ from eive_tmtc.tmtc.power.pwr_ctrl import handle_pwr_ctrl_hk_data
from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data
from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data
from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data
from tmtccmd.pus.tm.s3_fsfw_hk import (
Service3FsfwTm,
)
from tmtccmd.pus.tm.s3_hk_base import HkContentType from tmtccmd.pus.tm.s3_hk_base import HkContentType
from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT
@ -68,17 +69,20 @@ def handle_hk_packet(
hk_level: int, hk_level: int,
db_con: sqlite3.Connection, db_con: sqlite3.Connection,
): ):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False) tm_packet = PusTm.unpack(raw_tm, CdsShortTimestamp.TIMESTAMP_SIZE)
named_obj_id = cast(ObjectIdU32, obj_id_dict.get(tm_packet.object_id.as_bytes)) obj_id_raw = struct.unpack("!I", tm_packet.tm_data[0:4])[0]
named_obj_id = cast(ObjectIdU32, obj_id_dict.get(obj_id_raw))
if named_obj_id is None: if named_obj_id is None:
named_obj_id = tm_packet.object_id named_obj_id = ObjectIdU32(obj_id_raw, "Unknown ID")
if tm_packet.subservice == 25 or tm_packet.subservice == 26: if tm_packet.subservice == 25 or tm_packet.subservice == 26:
set_id = struct.unpack("!I", tm_packet.tm_data[4:8])[0]
hk_data = tm_packet.tm_data[8:] hk_data = tm_packet.tm_data[8:]
if named_obj_id.as_bytes in hk_filter.object_ids: if named_obj_id.as_bytes in hk_filter.object_ids:
if PRINT_RAW_HK_B64_STR: if PRINT_RAW_HK_B64_STR:
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}") print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
handle_regular_hk_print( handle_regular_hk_print(
printer=printer, printer=printer,
set_id=set_id,
packet_uuid=packet_uuid, packet_uuid=packet_uuid,
object_id=named_obj_id, object_id=named_obj_id,
hk_packet=tm_packet, hk_packet=tm_packet,
@ -89,11 +93,12 @@ def handle_hk_packet(
try: try:
if hk_level >= 1: if hk_level >= 1:
printer.generic_hk_tm_print( printer.generic_hk_tm_print(
HkContentType.HK, named_obj_id, tm_packet.set_id, hk_data HkContentType.HK, named_obj_id, set_id, hk_data
) )
if hk_level >= 1: if hk_level >= 1:
handle_regular_hk_print( handle_regular_hk_print(
printer=printer, printer=printer,
set_id=set_id,
packet_uuid=packet_uuid, packet_uuid=packet_uuid,
object_id=named_obj_id, object_id=named_obj_id,
hk_packet=tm_packet, hk_packet=tm_packet,
@ -109,7 +114,8 @@ def handle_hk_packet(
def handle_regular_hk_print( # noqa C901: Complexity okay here def handle_regular_hk_print( # noqa C901: Complexity okay here
hk_packet: Service3FsfwTm, hk_packet: PusTm,
set_id: int,
packet_uuid: UUID, packet_uuid: UUID,
hk_data: bytes, hk_data: bytes,
db: sqlite3.Connection, db: sqlite3.Connection,
@ -117,12 +123,15 @@ def handle_regular_hk_print( # noqa C901: Complexity okay here
printer: FsfwTmTcPrinter, printer: FsfwTmTcPrinter,
): ):
objb = object_id.as_bytes objb = object_id.as_bytes
set_id = hk_packet.set_id
hk_info = HkTmInfo( hk_info = HkTmInfo(
packet_uuid=packet_uuid, hk_packet=hk_packet, db_con=db, hk_data=hk_data packet_uuid=packet_uuid,
hk_packet=hk_packet,
db_con=db,
hk_data=hk_data,
set_id=set_id,
) )
assert hk_packet.pus_tm.time_provider is not None timestamp = CdsShortTimestamp.unpack(hk_packet.timestamp)
packet_dt = hk_packet.pus_tm.time_provider.as_date_time() packet_dt = timestamp.as_datetime()
pw = PrintWrapper(printer.file_logger) pw = PrintWrapper(printer.file_logger)
"""This function is called when a Service 3 Housekeeping packet is received.""" """This function is called when a Service 3 Housekeeping packet is received."""
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]: if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
@ -139,7 +148,7 @@ def handle_regular_hk_print( # noqa C901: Complexity okay here
pw=pw, pw=pw,
set_id=set_id, set_id=set_id,
hk_data=hk_data, hk_data=hk_data,
packet_time=packet_dt, packet_time=timestamp.as_datetime(),
) )
elif objb == obj_ids.PCDU_HANDLER_ID: elif objb == obj_ids.PCDU_HANDLER_ID:
return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data) return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data)

View File

@ -15,14 +15,14 @@ from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
from tmtccmd.pus import VerificationWrapper from tmtccmd.pus import VerificationWrapper
from tmtccmd.pus.s20_fsfw_param import Service20FsfwTm, Service20ParamDumpWrapper from tmtccmd.pus.s20_fsfw_param import Service20FsfwTm, Service20ParamDumpWrapper
from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice
from tmtccmd.pus.s200_fsfw_mode import Service200FsfwTm from tmtccmd.pus.s200_fsfw_mode import Service200FsfwReader, Service200FsfwTm
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
from tmtccmd.tmtc import GenericApidHandlerBase, SpecificApidHandlerBase from tmtccmd.tmtc import GenericApidHandlerBase, SpecificApidHandlerBase
from eive_tmtc.config.definitions import TM_DB_PATH, PUS_APID from eive_tmtc.config.definitions import TM_DB_PATH, PUS_APID
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids
from .action_reply_handler import handle_action_reply from .action_reply_handler import handle_action_service_tm
from .defs import PrintWrapper from .defs import PrintWrapper
from .event_handler import handle_event_packet from .event_handler import handle_event_packet
from .hk_handler import HkFilter, handle_hk_packet from .hk_handler import HkFilter, handle_hk_packet
@ -63,15 +63,22 @@ class PusHandler(SpecificApidHandlerBase):
_LOGGER.warning("Detected packet shorter than 8 bytes!") _LOGGER.warning("Detected packet shorter than 8 bytes!")
return return
try: try:
tm_packet = PusTelemetry.unpack(packet, CdsShortTimestamp.empty()) tm_packet = PusTelemetry.unpack(packet, CdsShortTimestamp.TIMESTAMP_SIZE)
# _LOGGER.info(f"Sequence count: {tm_packet.seq_count}") # _LOGGER.info(f"Sequence count: {tm_packet.seq_count}")
except ValueError as value_error: except ValueError as value_error:
_LOGGER.warning(f"{value_error}") _LOGGER.warning(f"{value_error}")
_LOGGER.warning("Could not generate PUS TM object from raw data") _LOGGER.warning("Could not generate PUS TM object from raw data")
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}") _LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
return return
timestamp = CdsShortTimestamp.unpack(tm_packet.timestamp)
db_con = sqlite3.connect(TM_DB_PATH) db_con = sqlite3.connect(TM_DB_PATH)
self._store_packet_in_db(db_con, packet, tm_packet, packet_uuid) self._store_packet_in_db(
db_con=db_con,
packet=packet,
tm_packet=tm_packet,
timestamp=timestamp,
packet_uuid=packet_uuid,
)
service = tm_packet.service service = tm_packet.service
dedicated_handler = True dedicated_handler = True
if service == 1: if service == 1:
@ -89,12 +96,12 @@ class PusHandler(SpecificApidHandlerBase):
elif service == 5: elif service == 5:
handle_event_packet(raw_tm=packet, pw=self.pw) handle_event_packet(raw_tm=packet, pw=self.pw)
elif service == 8: elif service == 8:
handle_action_reply( handle_action_service_tm(
raw_tm=packet, printer=self.printer, obj_id_dict=self.obj_id_dict raw_tm=packet, pw=self.pw, obj_id_dict=self.obj_id_dict
) )
elif service == 17: elif service == 17:
pus17_tm = Service17Tm.unpack( pus17_tm = Service17Tm.unpack(
data=packet, time_reader=CdsShortTimestamp.empty() data=packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
) )
if pus17_tm.subservice == 2: if pus17_tm.subservice == 2:
self.verif_wrapper.dlog("Received Ping Reply TM[17,2]") self.verif_wrapper.dlog("Received Ping Reply TM[17,2]")
@ -102,7 +109,7 @@ class PusHandler(SpecificApidHandlerBase):
elif service == 20: elif service == 20:
self._handle_param_packet(packet, tm_packet) self._handle_param_packet(packet, tm_packet)
elif service == 200: elif service == 200:
dedicated_handler = self._handle_mode_packet(packet, tm_packet) dedicated_handler = self._handle_mode_packet(tm_packet)
else: else:
_LOGGER.info( _LOGGER.info(
f"The service {service} is not implemented in Telemetry Factory" f"The service {service} is not implemented in Telemetry Factory"
@ -119,11 +126,11 @@ class PusHandler(SpecificApidHandlerBase):
self, self,
db_con: sqlite3.Connection, db_con: sqlite3.Connection,
packet: bytes, packet: bytes,
timestamp: CdsShortTimestamp,
tm_packet: PusTelemetry, tm_packet: PusTelemetry,
packet_uuid: uuid.UUID, packet_uuid: uuid.UUID,
): ):
cursor = db_con.cursor() cursor = db_con.cursor()
assert tm_packet.time_provider is not None
cursor.execute( cursor.execute(
""" """
CREATE TABLE IF NOT EXISTS pus_tm( CREATE TABLE IF NOT EXISTS pus_tm(
@ -139,7 +146,7 @@ class PusHandler(SpecificApidHandlerBase):
"INSERT INTO pus_tm VALUES(?, ?, ?, ?, ?, ?)", "INSERT INTO pus_tm VALUES(?, ?, ?, ?, ?, ?)",
( (
str(packet_uuid), str(packet_uuid),
tm_packet.time_provider.as_datetime(), timestamp.as_datetime(),
tm_packet.service, tm_packet.service,
tm_packet.subservice, tm_packet.subservice,
len(packet), len(packet),
@ -150,7 +157,7 @@ class PusHandler(SpecificApidHandlerBase):
def _handle_param_packet(self, raw_data: bytes, tm_packet: PusTelemetry): def _handle_param_packet(self, raw_data: bytes, tm_packet: PusTelemetry):
param_packet = Service20FsfwTm.unpack( param_packet = Service20FsfwTm.unpack(
raw_telemetry=raw_data, time_reader=CdsShortTimestamp.empty() raw_telemetry=raw_data, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
) )
if tm_packet.subservice == ParamSubservice.TM_DUMP_REPLY: if tm_packet.subservice == ParamSubservice.TM_DUMP_REPLY:
param_wrapper = Service20ParamDumpWrapper(param_tm=param_packet) param_wrapper = Service20ParamDumpWrapper(param_tm=param_packet)
@ -186,21 +193,21 @@ class PusHandler(SpecificApidHandlerBase):
f"unknown subservice {tm_packet.subservice} for parameter service" f"unknown subservice {tm_packet.subservice} for parameter service"
) )
def _handle_mode_packet(self, raw_data: bytes, _: PusTelemetry) -> bool: def _handle_mode_packet(self, pus_tm: PusTelemetry) -> bool:
tm_packet = Service200FsfwTm.unpack( tm_packet = Service200FsfwReader(pus_tm)
raw_telemetry=raw_data, time_reader=CdsShortTimestamp.empty() if tm_packet.tm.subservice == ModeSubservice.TM_CANT_REACH_MODE:
)
if tm_packet.subservice == ModeSubservice.TM_CANT_REACH_MODE:
obj_id = tm_packet.object_id obj_id = tm_packet.object_id
obj_id_obj = self.obj_id_dict.get(obj_id) obj_id_obj = self.obj_id_dict.get(obj_id)
retval = tm_packet.return_value retval = tm_packet.return_value
assert retval is not None
string_list = generic_retval_printout(retval) string_list = generic_retval_printout(retval)
self.pw.dlog(f"Received Mode Reply from {obj_id_obj}: Can't reach mode.") self.pw.dlog(f"Received Mode Reply from {obj_id_obj}: Can't reach mode.")
for string in string_list: for string in string_list:
self.pw.dlog(f"Reason: {string}") self.pw.dlog(f"Reason: {string}")
return True return True
if tm_packet.subservice == ModeSubservice.TM_WRONG_MODE_REPLY: if tm_packet.tm.subservice == ModeSubservice.TM_WRONG_MODE_REPLY:
self.pw.dlog(f"Received Mode TM wrong mode reply, mode: {tm_packet.mode}") self.pw.dlog(f"Received Mode TM wrong mode reply, mode: {tm_packet.mode}")
return True
return False return False

View File

@ -17,7 +17,7 @@ def handle_service_1_fsfw_packet(wrapper: VerificationWrapper, raw_tm: bytes):
) )
# Error code with length 2 is FSFW specific # Error code with length 2 is FSFW specific
tm_packet = Service1Tm.unpack( tm_packet = Service1Tm.unpack(
data=raw_tm, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2) data=raw_tm, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
) )
fsfw_wrapper = Service1FsfwWrapper(tm_packet) fsfw_wrapper = Service1FsfwWrapper(tm_packet)
res = wrapper.verificator.add_tm(tm_packet) res = wrapper.verificator.add_tm(tm_packet)

View File

@ -8,7 +8,7 @@ from socket import AF_INET
from typing import Tuple from typing import Tuple
from tmtccmd.config.tmtc import CmdTreeNode from tmtccmd.config.tmtc import CmdTreeNode
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd
from tmtccmd.pus.s20_fsfw_param_defs import ( from tmtccmd.pus.s20_fsfw_param_defs import (
@ -557,7 +557,7 @@ def handle_acs_ctrl_sus_raw_data(pw: PrintWrapper, hk_data: bytes):
sus_list_formatted = vec_fmt.format(*sus_list) sus_list_formatted = vec_fmt.format(*sus_list)
current_idx += length current_idx += length
pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}") pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=12)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=12))
def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes): def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
@ -593,7 +593,7 @@ def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
sun_ijk_model = vec_fmt.format(*sun_ijk_model) sun_ijk_model = vec_fmt.format(*sun_ijk_model)
current_idx += inc_len current_idx += inc_len
pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}") pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=15)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=15))
def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes): def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
@ -649,7 +649,7 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}") pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
current_idx += 1 current_idx += 1
assert current_idx == 61 assert current_idx == 61
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -703,7 +703,7 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
current_idx += inc_len current_idx += inc_len
if PERFORM_MGM_CALIBRATION: if PERFORM_MGM_CALIBRATION:
perform_mgm_calibration(pw, mgm_3) perform_mgm_calibration(pw, mgm_3)
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=8))
def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes): def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
@ -737,7 +737,7 @@ def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}") pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}")
pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}") pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}")
pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}") pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 4))
GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"] GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"]
@ -763,7 +763,7 @@ def handle_gyr_data_processed(pw: PrintWrapper, hk_data: bytes):
] ]
pw.dlog(f"GYR Vec Total: {gyr_vec_tot}") pw.dlog(f"GYR Vec Total: {gyr_vec_tot}")
current_idx += inc_len current_idx += inc_len
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes): def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
@ -826,7 +826,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"GPS Altitude: {alt} [m]") pw.dlog(f"GPS Altitude: {alt} [m]")
pw.dlog(f"GPS Position: {pos} [m]") pw.dlog(f"GPS Position: {pos} [m]")
pw.dlog(f"GPS Velocity: {velo} [m/s]") pw.dlog(f"GPS Velocity: {velo} [m/s]")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
def handle_attitude_estimation_data(pw: PrintWrapper, hk_data: bytes): def handle_attitude_estimation_data(pw: PrintWrapper, hk_data: bytes):
@ -881,9 +881,9 @@ def handle_attitude_estimation_data(pw: PrintWrapper, hk_data: bytes):
) )
current_idx += inc_len_quat current_idx += inc_len_quat
pw.dlog(f"{'QUEST Quaternion'.ljust(25)}: {fmt_str_4.format(*quest_quat)}") pw.dlog(f"{'QUEST Quaternion'.ljust(25)}: {fmt_str_4.format(*quest_quat)}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=4)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=4))
return return
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes): def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
@ -940,7 +940,7 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Control Values Error Quaternion: {err_quat}") pw.dlog(f"Control Values Error Quaternion: {err_quat}")
pw.dlog(f"Control Values Error Angle: {err_ang} [deg]") pw.dlog(f"Control Values Error Angle: {err_ang} [deg]")
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]") pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes): def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
@ -979,7 +979,7 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}") pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}")
pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}") pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}")
pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}") pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
def handle_fused_rot_rate_data(pw: PrintWrapper, hk_data: bytes): def handle_fused_rot_rate_data(pw: PrintWrapper, hk_data: bytes):
@ -1030,9 +1030,9 @@ def handle_fused_rot_rate_data(pw: PrintWrapper, hk_data: bytes):
) )
else: else:
pw.dlog(f"Ctrl Strategy (key unknown): {rot_rate_source}") pw.dlog(f"Ctrl Strategy (key unknown): {rot_rate_source}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=4)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=4))
return return
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
def handle_fused_rot_rate_source_data(pw: PrintWrapper, hk_data: bytes): def handle_fused_rot_rate_source_data(pw: PrintWrapper, hk_data: bytes):
@ -1087,7 +1087,7 @@ def handle_fused_rot_rate_source_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Fused Rotational Rate Total SUSMGM: {rot_rate_total_susmgm} [deg/s]") pw.dlog(f"Fused Rotational Rate Total SUSMGM: {rot_rate_total_susmgm} [deg/s]")
pw.dlog(f"Fused Rotational Rate Total QUEST: {rot_rate_total_quest} [deg/s]") pw.dlog(f"Fused Rotational Rate Total QUEST: {rot_rate_total_quest} [deg/s]")
pw.dlog(f"Fused Rotational Rate Total STR: {rot_rate_total_str} [deg/s]") pw.dlog(f"Fused Rotational Rate Total STR: {rot_rate_total_str} [deg/s]")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
def handle_acs_ctrl_action_replies( def handle_acs_ctrl_action_replies(

View File

@ -14,7 +14,7 @@ from tmtccmd.pus.tc.s3_fsfw_hk import (
create_disable_periodic_hk_command_with_diag, create_disable_periodic_hk_command_with_diag,
) )
from tmtccmd.pus.tc.s8_fsfw_action import create_action_cmd from tmtccmd.pus.tc.s8_fsfw_action import create_action_cmd
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -203,9 +203,7 @@ def handle_core_data(pw: PrintWrapper, hk_data: bytes):
) )
pw.dlog(f"GNSS Date: {date_string}") pw.dlog(f"GNSS Date: {date_string}")
pw.dlog(f"Unix seconds {unix_seconds}") pw.dlog(f"Unix seconds {unix_seconds}")
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=14))
validity_buffer=hk_data[current_idx:], num_vars=14
)
def handle_skyview_data(pw: PrintWrapper, hk_data: bytes): def handle_skyview_data(pw: PrintWrapper, hk_data: bytes):
@ -265,6 +263,4 @@ def handle_skyview_data(pw: PrintWrapper, hk_data: bytes):
used[idx], used[idx],
) )
) )
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=6))
validity_buffer=hk_data[current_idx:], num_vars=6
)

View File

@ -31,7 +31,7 @@ from tmtccmd.pus.tc.s3_fsfw_hk import (
) )
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -543,7 +543,7 @@ def handle_dipole_set(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Dipole Y: {dipole_y}") pw.dlog(f"Dipole Y: {dipole_y}")
pw.dlog(f"Dipole Z: {dipole_z}") pw.dlog(f"Dipole Z: {dipole_z}")
pw.dlog(f"Current torque duration: {current_torque_duration}") pw.dlog(f"Current torque duration: {current_torque_duration}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[fmt_len:], 2) pw.dlog(get_validity_buffer_str(hk_data[fmt_len:], 2))
def unpack_eng_hk(hk_data: bytes) -> List: def unpack_eng_hk(hk_data: bytes) -> List:
@ -583,9 +583,7 @@ def handle_eng_set(pw: PrintWrapper, hk_data: bytes, torque_on: bool):
for k, v in zip(ENG_HK_HEADERS, content_list): for k, v in zip(ENG_HK_HEADERS, content_list):
pw.dlog(f"{k.ljust(30)}: {v}") pw.dlog(f"{k.ljust(30)}: {v}")
pw.dlog( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer( get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
validity_buffer=validity_buffer, num_vars=num_of_vars
)
) )
@ -597,9 +595,7 @@ def handle_status_set(pw: PrintWrapper, hk_data: bytes):
for k, v in zip(STATUS_HEADERS, content_list): for k, v in zip(STATUS_HEADERS, content_list):
pw.dlog(f"{k.ljust(30)}: {v}") pw.dlog(f"{k.ljust(30)}: {v}")
pw.dlog( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer( get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
validity_buffer=validity_buffer, num_vars=num_of_vars
)
) )
@ -620,9 +616,7 @@ def handle_calibrated_mtm_measurement(pw: PrintWrapper, hk_data: bytes):
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
pw.dlog( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer( get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
validity_buffer=validity_buffer, num_vars=num_of_vars
)
) )
@ -644,9 +638,7 @@ def handle_raw_mtm_measurement(pw: PrintWrapper, hk_data: bytes, torque_status:
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
pw.dlog( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer( get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
validity_buffer=validity_buffer, num_vars=num_of_vars
)
) )
@ -783,7 +775,5 @@ def handle_self_test_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
pw.dlog( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer( get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
validity_buffer=validity_buffer, num_vars=num_of_vars
)
) )

View File

@ -23,7 +23,7 @@ from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode, Subservice from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode, Subservice
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
class OpCodesDev: class OpCodesDev:
@ -301,7 +301,7 @@ def handle_rw_hk_data(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), " f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
"1: High Current Mode (0.6 A)" "1: High Current Mode (0.6 A)"
) )
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 5))
if set_id == RwSetId.LAST_RESET: if set_id == RwSetId.LAST_RESET:
pw.dlog( pw.dlog(
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}" f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
@ -390,7 +390,7 @@ def handle_rw_hk_data(
) )
if current_idx > 0: if current_idx > 0:
pw.dlog( pw.dlog(
FsfwTmTcPrinter.get_validity_buffer( get_validity_buffer_str(
validity_buffer=hk_data[current_idx:], num_vars=27 validity_buffer=hk_data[current_idx:], num_vars=27
) )
) )

View File

@ -27,7 +27,7 @@ from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from eive_tmtc.config.object_ids import STR_ASSEMBLY, STAR_TRACKER_ID from eive_tmtc.config.object_ids import STR_ASSEMBLY, STAR_TRACKER_ID
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -750,12 +750,13 @@ def create_update_firmware_target_cmd(
else: else:
param_id = ParamId.FIRMWARE_TARGET param_id = ParamId.FIRMWARE_TARGET
return create_load_param_cmd( return create_load_param_cmd(
create_scalar_u8_parameter( parameter=create_scalar_u8_parameter(
STAR_TRACKER_ID, STAR_TRACKER_ID,
0, 0,
param_id, param_id,
fw_target, fw_target,
) ),
apid=0,
) )
@ -930,7 +931,7 @@ def handle_version_set(hk_data: bytes, pw: PrintWrapper):
minor = struct.unpack("!B", hk_data[current_idx : current_idx + 1])[0] minor = struct.unpack("!B", hk_data[current_idx : current_idx + 1])[0]
pw.dlog(f"Minor: {minor}") pw.dlog(f"Minor: {minor}")
current_idx += 1 current_idx += 1
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
def handle_temperature_set(hk_data: bytes, pw: PrintWrapper): def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
@ -947,7 +948,7 @@ def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"CMOS Temperature: {cmos_temp}") pw.dlog(f"CMOS Temperature: {cmos_temp}")
pw.dlog(f"FPGA Temperature: {fpga_temp}") pw.dlog(f"FPGA Temperature: {fpga_temp}")
current_idx += fmt_len current_idx += fmt_len
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 5))
def handle_solution_set(hk_data: bytes, pw: PrintWrapper): def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
@ -1021,7 +1022,7 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
solution_strategy = hk_data[current_idx] solution_strategy = hk_data[current_idx]
pw.dlog(f"Solution strategy: {solution_strategy}") pw.dlog(f"Solution strategy: {solution_strategy}")
current_idx += 1 current_idx += 1
print(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 23))
def handle_blob_set(hk_data: bytes, pw: PrintWrapper): def handle_blob_set(hk_data: bytes, pw: PrintWrapper):
@ -1032,7 +1033,7 @@ def handle_blob_set(hk_data: bytes, pw: PrintWrapper):
current_idx = unpack_time_hk(hk_data, 0, pw) current_idx = unpack_time_hk(hk_data, 0, pw)
blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0] blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
pw.dlog(f"Blob count: {blob_count}") pw.dlog(f"Blob count: {blob_count}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx + 4 :], num_vars=3) pw.dlog(get_validity_buffer_str(hk_data[current_idx + 4 :], num_vars=3))
def handle_blobs_set(hk_data: bytes, pw: PrintWrapper): def handle_blobs_set(hk_data: bytes, pw: PrintWrapper):
@ -1060,7 +1061,7 @@ def handle_blobs_set(hk_data: bytes, pw: PrintWrapper):
for idx in range(8): for idx in range(8):
pw.dlog("{:<8} {:<8}".format(x_coords[idx], y_coords[idx])) pw.dlog("{:<8} {:<8}".format(x_coords[idx], y_coords[idx]))
assert current_idx == len(hk_data) - 1 assert current_idx == len(hk_data) - 1
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=7) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=7))
def handle_centroid_set(hk_data: bytes, pw: PrintWrapper): def handle_centroid_set(hk_data: bytes, pw: PrintWrapper):
@ -1074,7 +1075,7 @@ def handle_centroid_set(hk_data: bytes, pw: PrintWrapper):
current_idx += 4 current_idx += 4
pw.dlog(f"Centroid count: {centroid_count}") pw.dlog(f"Centroid count: {centroid_count}")
assert current_idx == len(hk_data) - 1 assert current_idx == len(hk_data) - 1
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
def handle_centroids_set(hk_data: bytes, pw: PrintWrapper): def handle_centroids_set(hk_data: bytes, pw: PrintWrapper):
@ -1101,7 +1102,7 @@ def handle_centroids_set(hk_data: bytes, pw: PrintWrapper):
) )
) )
assert current_idx == len(hk_data) - 1 assert current_idx == len(hk_data) - 1
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper): def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper):
@ -1145,7 +1146,7 @@ def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper):
) )
) )
assert current_idx == len(hk_data) - 1 assert current_idx == len(hk_data) - 1
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=8))
def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper): def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper):
@ -1163,7 +1164,7 @@ def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper):
current_idx += inc_len current_idx += inc_len
assert current_idx == len(hk_data) - 1 assert current_idx == len(hk_data) - 1
pw.dlog(f"Threshold {threshold}") pw.dlog(f"Threshold {threshold}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
def handle_histo_or_contrast_set(name: str, hk_data: bytes, pw: PrintWrapper): def handle_histo_or_contrast_set(name: str, hk_data: bytes, pw: PrintWrapper):
@ -1239,7 +1240,7 @@ def handle_blob_stats_set(hk_data: bytes, pw: PrintWrapper):
i, noise_list[i], threshold_list[i], lvalid_list[i], oflow_list[i] i, noise_list[i], threshold_list[i], lvalid_list[i], oflow_list[i]
) )
) )
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6)) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
def handle_star_tracker_action_replies( def handle_star_tracker_action_replies(

View File

@ -3,7 +3,7 @@ import struct
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
class SetId(enum.IntEnum): class SetId(enum.IntEnum):
@ -26,6 +26,6 @@ def handle_sus_hk(
pw.dlog("AIN Channel | Raw Value (hex) | Raw Value (dec)") pw.dlog("AIN Channel | Raw Value (hex) | Raw Value (dec)")
for idx, val in enumerate(channels): for idx, val in enumerate(channels):
pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5)) pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5))
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=7 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=7)
) )

View File

@ -78,12 +78,13 @@ def pack_pdec_handler_commands(
pw = int(input("Specify positive window to set: ")) pw = int(input("Specify positive window to set: "))
q.add_pus_tc( q.add_pus_tc(
create_load_param_cmd( create_load_param_cmd(
create_scalar_u8_parameter( parameter=create_scalar_u8_parameter(
object_id, object_id,
0, 0,
ParameterId.POSITIVE_WINDOW, ParameterId.POSITIVE_WINDOW,
pw, pw,
) ),
apid=0,
) )
) )
if cmd_str == OpCode.NEGATIVE_WINDOW: if cmd_str == OpCode.NEGATIVE_WINDOW:
@ -91,12 +92,13 @@ def pack_pdec_handler_commands(
nw = int(input("Specify negative window to set: ")) nw = int(input("Specify negative window to set: "))
q.add_pus_tc( q.add_pus_tc(
create_load_param_cmd( create_load_param_cmd(
create_scalar_u8_parameter( parameter=create_scalar_u8_parameter(
object_id, object_id,
0, 0,
ParameterId.NEGATIVE_WINDOW, ParameterId.NEGATIVE_WINDOW,
nw, nw,
) ),
apid=0,
) )
) )
if cmd_str == OpCode.RESET_NO_INIT: if cmd_str == OpCode.RESET_NO_INIT:

View File

@ -17,7 +17,7 @@ from tmtccmd.config.tmtc import (
TmtcDefinitionWrapper, TmtcDefinitionWrapper,
tmtc_definitions_provider, tmtc_definitions_provider,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.pus.s200_fsfw_mode import Mode, create_mode_command from tmtccmd.pus.s200_fsfw_mode import Mode, create_mode_command
from tmtccmd.pus.tc.s3_fsfw_hk import ( from tmtccmd.pus.tc.s3_fsfw_hk import (
create_disable_periodic_hk_command_with_diag, create_disable_periodic_hk_command_with_diag,
@ -297,7 +297,7 @@ def handle_syrlinks_temp_dataset(hk_info: HkTmInfo, pw: PrintWrapper):
temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0] temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0]
pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}") pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}")
pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}") pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[8:], 2)) pw.dlog(get_validity_buffer_str(hk_data[8:], 2))
def handle_syrlinks_rx_registers_dataset( def handle_syrlinks_rx_registers_dataset(
@ -364,9 +364,7 @@ def handle_syrlinks_rx_registers_dataset(
validity_buffer = hk_data[22:] validity_buffer = hk_data[22:]
for header, content in zip(header_list, content_list): for header, content in zip(header_list, content_list):
pw.dlog(f"{header}: {content}") pw.dlog(f"{header}: {content}")
pw.dlog( pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=8))
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
)
print(f"Carrier Detect: {carrier_detect}") print(f"Carrier Detect: {carrier_detect}")
print(f"Carrier Lock: {carrier_lock}") print(f"Carrier Lock: {carrier_lock}")
print(f"Data Lock (data clock recovery loop lock status): {data_lock}") print(f"Data Lock (data clock recovery loop lock status): {data_lock}")
@ -394,7 +392,7 @@ def handle_syrlinks_rx_registers_dataset(
"INSERT INTO syrlinks_rx_regs VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)", "INSERT INTO syrlinks_rx_regs VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
( (
str(hk_info.packet_uuid), str(hk_info.packet_uuid),
hk_info.hk_packet.pus_tm.time_provider.as_datetime(), # type: ignore hk_info.packet_datetime,
carrier_detect, carrier_detect,
carrier_lock, carrier_lock,
data_lock, data_lock,
@ -470,9 +468,7 @@ def handle_syrlinks_tx_registers_dataset(
validity_buffer = hk_info.hk_data[4:] validity_buffer = hk_info.hk_data[4:]
for header, content in zip(header_list, content_list): for header, content in zip(header_list, content_list):
pw.dlog(f"{header}: {content}") pw.dlog(f"{header}: {content}")
pw.dlog( pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=3))
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
)
# pw.dlog(f"TX CONV: {tx_conv!r}") # pw.dlog(f"TX CONV: {tx_conv!r}")
# pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}") # pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}")
print(f"TX Status: {tx_status_status!r}") print(f"TX Status: {tx_status_status!r}")
@ -504,7 +500,7 @@ def handle_syrlinks_tx_registers_dataset(
"INSERT INTO syrlinks_tx_regs VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "INSERT INTO syrlinks_tx_regs VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
( (
str(hk_info.packet_uuid), str(hk_info.packet_uuid),
hk_info.hk_packet.pus_tm.time_provider.as_datetime(), # type: ignore hk_info.packet_datetime,
tx_status_status, tx_status_status,
tx_status_status.name, tx_status_status.name,
tx_conf_set, tx_conf_set,

View File

@ -15,7 +15,7 @@ from tmtccmd.pus.s20_fsfw_param import (
create_scalar_u8_parameter, create_scalar_u8_parameter,
create_load_param_cmd, create_load_param_cmd,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.pus.s11_tc_sched import ( from tmtccmd.pus.s11_tc_sched import (
create_enable_tc_sched_cmd, create_enable_tc_sched_cmd,
create_disable_tc_sched_cmd, create_disable_tc_sched_cmd,
@ -399,12 +399,13 @@ def pack_core_commands( # noqa C901
raise ValueError("Only 0 or 1 allowed for preferred SD card") raise ValueError("Only 0 or 1 allowed for preferred SD card")
q.add_pus_tc( q.add_pus_tc(
create_load_param_cmd( create_load_param_cmd(
create_scalar_u8_parameter( parameter=create_scalar_u8_parameter(
object_id=CORE_CONTROLLER_ID, object_id=CORE_CONTROLLER_ID,
domain_id=0, domain_id=0,
unique_id=ParamId.PREF_SD, unique_id=ParamId.PREF_SD,
parameter=pref_sd, parameter=pref_sd,
) ),
apid=0,
) )
) )
elif cmd_str == OpCode.CP_HELPER: elif cmd_str == OpCode.CP_HELPER:
@ -632,9 +633,7 @@ def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
f"PL Voltage [mV] {pl_voltage}" f"PL Voltage [mV] {pl_voltage}"
) )
pw.dlog(printout) pw.dlog(printout)
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[inc_len:], num_vars=3))
validity_buffer=hk_data[inc_len:], num_vars=3
)
def handle_core_ctrl_action_replies( def handle_core_ctrl_action_replies(

View File

@ -1,9 +1,8 @@
import enum import enum
import struct import struct
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
class SetId(enum.IntEnum): class SetId(enum.IntEnum):
@ -21,4 +20,4 @@ def handle_ier_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
pw.dlog(f"TM Errors: {tm_errors}") pw.dlog(f"TM Errors: {tm_errors}")
pw.dlog(f"Queue Errors: {queue_errors}") pw.dlog(f"Queue Errors: {queue_errors}")
pw.dlog(f"Store Errors: {store_hits}") pw.dlog(f"Store Errors: {store_hits}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[inc_len:], 3)) pw.dlog(get_validity_buffer_str(hk_data[inc_len:], 3))

View File

@ -717,7 +717,7 @@ class DirElement:
size: int size: int
def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytearray): def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytes):
if action_id == ActionId.TM_MEM_READ_RPT: if action_id == ActionId.TM_MEM_READ_RPT:
header_list = [ header_list = [
"PLOC Memory Address", "PLOC Memory Address",

View File

@ -19,7 +19,7 @@ from tmtccmd.config.tmtc import CmdTreeNode
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from eive_tmtc.utility.input_helper import InputHelper from eive_tmtc.utility.input_helper import InputHelper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -701,7 +701,7 @@ def handle_hk_report(hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"NVM 3 State {nvm_3_state}") pw.dlog(f"NVM 3 State {nvm_3_state}")
pw.dlog(f"Mission IO state {mission_io_state}") pw.dlog(f"Mission IO state {mission_io_state}")
pw.dlog(f"FMC state {fmc_state}") pw.dlog(f"FMC state {fmc_state}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[inc_len:], 13)) pw.dlog(get_validity_buffer_str(hk_data[inc_len:], 13))
def handle_boot_report(hk_data: bytes, pw: PrintWrapper): def handle_boot_report(hk_data: bytes, pw: PrintWrapper):
@ -730,7 +730,7 @@ def handle_boot_report(hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"Active NVM: {active_nvm}") pw.dlog(f"Active NVM: {active_nvm}")
pw.dlog(f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}") pw.dlog(f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}")
pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}") pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 10) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 10))
def handle_adc_report(hk_data: bytes): def handle_adc_report(hk_data: bytes):

View File

@ -28,7 +28,7 @@ from tmtccmd.pus.s20_fsfw_param import (
) )
from spacepackets.ecss.tc import PusTelecommand from spacepackets.ecss.tc import PusTelecommand
from eive_tmtc.config.object_ids import PL_PCDU_ID from eive_tmtc.config.object_ids import PL_PCDU_ID
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -615,8 +615,8 @@ def handle_plpcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw.dlog(ch_print) pw.dlog(ch_print)
for i in range(12): for i in range(12):
pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}") pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}")
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=3 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=3)
) )

View File

@ -18,7 +18,7 @@ from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.util import ObjectIdU32 from tmtccmd.util import ObjectIdU32
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
class SetId(enum.IntEnum): class SetId(enum.IntEnum):
@ -115,6 +115,6 @@ def handle_rad_sensor_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
for idx, val in ain_dict.items(): for idx, val in ain_dict.items():
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}") pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
current_idx += inc_len current_idx += inc_len
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=7 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=7)
) )

View File

@ -82,7 +82,7 @@ def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, cmd_str:
def acu_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str): def acu_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
req_hk_cmds("ACU", q, op_code, ACU_HANDLER_ID, [SetId.CORE, SetId.AUX]) req_hk_cmds("ACU", q, op_code, ACU_HANDLER_ID, (SetId.CORE, SetId.AUX))
class ACUTestProcedure: class ACUTestProcedure:

View File

@ -5,7 +5,7 @@ from spacepackets.ecss import PusTelecommand
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
CmdTreeNode, CmdTreeNode,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.pus.s8_fsfw_action import create_action_cmd from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices
@ -213,9 +213,7 @@ def handle_bpx_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
validity_buffer = hk_data[inc_len:] validity_buffer = hk_data[inc_len:]
pw.dlog(str(HEADER_LIST)) pw.dlog(str(HEADER_LIST))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=10))
validity_buffer=validity_buffer, num_vars=10
)
elif set_id == BpxSetId.GET_CFG_SET: elif set_id == BpxSetId.GET_CFG_SET:
battheat_mode = hk_data[0] battheat_mode = hk_data[0]
battheat_low = struct.unpack("!b", hk_data[1:2])[0] battheat_low = struct.unpack("!b", hk_data[1:2])[0]
@ -229,6 +227,4 @@ def handle_bpx_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
validity_buffer = hk_data[3:] validity_buffer = hk_data[3:]
pw.dlog(str(header_list)) pw.dlog(str(header_list))
pw.dlog(str(content_list)) pw.dlog(str(content_list))
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=10))
validity_buffer=validity_buffer, num_vars=10
)

View File

@ -259,4 +259,4 @@ def create_p60_dock_node() -> CmdTreeNode:
def p60_dock_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str): def p60_dock_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
req_hk_cmds("P60 Dock", q, op_code, P60_DOCK_HANDLER, [SetId.CORE, SetId.AUX]) req_hk_cmds("P60 Dock", q, op_code, P60_DOCK_HANDLER, (SetId.CORE, SetId.AUX))

View File

@ -98,7 +98,7 @@ def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, cmd_str
def pdu1_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str): def pdu1_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
req_hk_cmds("PDU1", q, op_code, PDU_1_HANDLER_ID, [SetId.CORE, SetId.AUX]) req_hk_cmds("PDU1", q, op_code, PDU_1_HANDLER_ID, (SetId.CORE, SetId.AUX))
def info_on_pdu1(base: str) -> str: def info_on_pdu1(base: str) -> str:

View File

@ -332,7 +332,7 @@ def add_pdu2_common_defs(oce: OpCodeEntry):
def pdu2_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str): def pdu2_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
req_hk_cmds("PDU2", q, op_code, PDU_2_HANDLER_ID, [SetId.CORE, SetId.AUX]) req_hk_cmds("PDU2", q, op_code, PDU_2_HANDLER_ID, (SetId.CORE, SetId.AUX))
def pl_pcdu_bat_nom_on_cmd(q: DefaultPusQueueHelper): def pl_pcdu_bat_nom_on_cmd(q: DefaultPusQueueHelper):

View File

@ -6,7 +6,7 @@ import struct
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
CmdTreeNode, CmdTreeNode,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd
from tmtccmd.pus.s20_fsfw_param_defs import ( from tmtccmd.pus.s20_fsfw_param_defs import (
create_scalar_double_parameter, create_scalar_double_parameter,
@ -268,7 +268,7 @@ def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes):
pw.dlog(f"Total Battery Current: {total_battery_current} [mA]") pw.dlog(f"Total Battery Current: {total_battery_current} [mA]")
pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100:8.3f} [%]") pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100:8.3f} [%]")
pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100:8.3f} [%]") pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100:8.3f} [%]")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes): def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes):
@ -284,4 +284,4 @@ def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes):
)[0] )[0]
current_idx += inc_len_uint16 current_idx += inc_len_uint16
pw.dlog(f"PL Use Allowed: {pl_use_allowed}") pw.dlog(f"PL Use Allowed: {pl_use_allowed}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=1))

View File

@ -13,7 +13,7 @@ from eive_tmtc.tmtc.power.common_power import (
) )
from eive_tmtc.tmtc.power.power import PcduSetIds from eive_tmtc.tmtc.power.power import PcduSetIds
from tmtccmd.util import ObjectIdBase from tmtccmd.util import ObjectIdBase
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter, get_validity_buffer_str
from eive_tmtc.pus_tm.defs import PrintWrapper from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId
from eive_tmtc.config.object_ids import ( from eive_tmtc.config.object_ids import (
@ -125,6 +125,8 @@ class DevicesInfoParser:
return current_idx return current_idx
def print(self, pw: PrintWrapper): def print(self, pw: PrintWrapper):
if self.dev_types is None or self.dev_statuses is None:
return
pw.dlog("Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)") pw.dlog("Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)")
for i in range(len(self.dev_types)): for i in range(len(self.dev_types)):
pw.dlog( pw.dlog(
@ -372,8 +374,8 @@ def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | " temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | "
pw.dlog(temps) pw.dlog(temps)
pw.dlog(batt_info) pw.dlog(batt_info)
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=9 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=9)
) )
if set_id == SetId.AUX: if set_id == SetId.AUX:
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA") pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
@ -440,8 +442,8 @@ def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
"6:TempSens(BatPack)|7:TempSens(BatPack)" "6:TempSens(BatPack)|7:TempSens(BatPack)"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=27 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=27)
) )
@ -493,8 +495,8 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
f"Boot Count {bootcnt} | Uptime {uptime} sec | " f"Boot Count {bootcnt} | Uptime {uptime} sec | "
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec" f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
) )
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=12 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=12)
) )
if set_id == SetId.AUX: if set_id == SetId.AUX:
current_idx = 0 current_idx = 0
@ -531,13 +533,13 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|5:DAC|6:TempSens|7:Reserved" "ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|5:DAC|6:TempSens|7:Reserved"
) )
dev_parser.print(pw=pw) dev_parser.print(pw=pw)
FsfwTmTcPrinter.get_validity_buffer( pw.dlog(
validity_buffer=hk_data[current_idx:], num_vars=8 get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=8)
) )
def handle_get_param_data_reply( def handle_get_param_data_reply(
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytes
): ):
if action_id == GomspaceDeviceActionId.PARAM_GET: if action_id == GomspaceDeviceActionId.PARAM_GET:
pw.dlog(f"Parameter Get Request received for object {obj_id}") pw.dlog(f"Parameter Get Request received for object {obj_id}")
@ -677,4 +679,4 @@ def handle_pcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
pw.dlog(f"{name.ljust(25)}: {val}") pw.dlog(f"{name.ljust(25)}: {val}")
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}") pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}")
pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}") pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4) pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 4))

View File

@ -13,7 +13,7 @@ from tmtccmd.util import ObjectIdU32
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data, Subservice from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data, Subservice
import eive_tmtc.config.object_ids as oids import eive_tmtc.config.object_ids as oids
from eive_tmtc.config.object_ids import get_object_ids from eive_tmtc.config.object_ids import get_object_ids
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
RTD_IDS = [ RTD_IDS = [
oids.RTD_0_PLOC_HSPD, oids.RTD_0_PLOC_HSPD,
@ -170,7 +170,7 @@ def handle_rtd_hk(object_id: bytes, hk_data: bytes, pw: PrintWrapper):
pw.dlog(f"RTD Value: {rtd_val}") pw.dlog(f"RTD Value: {rtd_val}")
pw.dlog(f"Error Byte: {error_byte}") pw.dlog(f"Error Byte: {error_byte}")
pw.dlog(f"Last Error Byte: {last_err_byte}") pw.dlog(f"Last Error Byte: {last_err_byte}")
FsfwTmTcPrinter.get_validity_buffer(hk_data[fmt_len:], 4) pw.dlog(get_validity_buffer_str(hk_data[fmt_len:], 4))
def prompt_rtd_idx(): def prompt_rtd_idx():

View File

@ -18,7 +18,7 @@ from tmtccmd.config.tmtc import (
TmtcDefinitionWrapper, TmtcDefinitionWrapper,
OpCodeEntry, OpCodeEntry,
) )
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
from tmtccmd.tmtc import DefaultPusQueueHelper from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command, make_sid from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command, make_sid
@ -96,4 +96,4 @@ def handle_tmp_1075_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
if set_id == SetId.TEMPERATURE: if set_id == SetId.TEMPERATURE:
temp = struct.unpack("!f", hk_data[0:4])[0] temp = struct.unpack("!f", hk_data[0:4])[0]
pw.dlog(f"TMP1075 Temperature: {temp}") pw.dlog(f"TMP1075 Temperature: {temp}")
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[4:], 1)) pw.dlog(get_validity_buffer_str(hk_data[4:], 1))

View File

@ -6,32 +6,32 @@ build-backend = "setuptools.build_meta"
name = "eive-tmtc" name = "eive-tmtc"
description = "TMTC Commander EIVE" description = "TMTC Commander EIVE"
readme = "README.md" readme = "README.md"
version = "7.0.0" version = "7.2.0"
requires-python = ">=3.10" requires-python = ">=3.10"
license = {text = "Apache-2.0"} license = {text = "Apache-2.0"}
authors = [ authors = [
{name = "Robin Mueller", email = "muellerr@irs.uni-stuttgart.de"}, {name = "Robin Mueller", email = "muellerr@irs.uni-stuttgart.de"},
{name = "Jakob Meier", email = "meierj@irs.uni-stuttgart.de"}, {name = "Jakob Meier", email = "meierj@irs.uni-stuttgart.de"},
] ]
keywords = ["eive", "space", "communication", "commanding"] keywords = ["eive", "space", "communication", "commanding"]
classifiers = [ classifiers = [
"Development Status :: 5 - Production/Stable", "Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Natural Language :: English", "Natural Language :: English",
"Operating System :: POSIX", "Operating System :: POSIX",
"Operating System :: Microsoft :: Windows", "Operating System :: Microsoft :: Windows",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Topic :: Communications", "Topic :: Communications",
"Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Scientific/Engineering" "Topic :: Scientific/Engineering"
] ]
dependencies = [ dependencies = [
"tmtccmd == 8.0.0rc1", "tmtccmd ~= 8.1",
"cfdp-py~=0.1.0",
# "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main", # "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main",
"cfdp-py>=0.4, <=0.5",
"python-dateutil ~= 2.8", "python-dateutil ~= 2.8",
] ]
@ -45,7 +45,7 @@ include-package-data = true
[tool.setuptools.packages] [tool.setuptools.packages]
find = {} find = {}
[tool.ruff] [tool.ruff.lint]
ignore = ["E501"] ignore = ["E501"]
[tool.ruff.extend-per-file-ignores] [tool.ruff.lint.extend-per-file-ignores]
"__init__.py" = ["F401"] "__init__.py" = ["F401"]

View File

@ -97,8 +97,8 @@ def setup_params() -> Tuple[SetupWrapper, int]:
hk_level = int(post_arg_parsing_wrapper.args_raw.hk) hk_level = int(post_arg_parsing_wrapper.args_raw.hk)
else: else:
hk_level = 0 hk_level = 0
if params.app_params.print_tree: if params.cmd_params.print_tree:
perform_tree_printout(params.app_params, hook_obj.get_command_definitions()) perform_tree_printout(params.cmd_params, hook_obj.get_command_definitions())
sys.exit(0) sys.exit(0)
params.apid = PUS_APID params.apid = PUS_APID
if params.com_if is None: if params.com_if is None: