Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
6e85b1add8 | |||
73a7c6ec14 | |||
28161059ca | |||
52d434630b | |||
55a357b52c | |||
2d6e9f826c | |||
20489a02d2 | |||
652f3bf66f | |||
8419a4edd7 | |||
673c8b2cf2 | |||
e23b39e652 | |||
9a55686a26 | |||
067a016040 | |||
dccbf89da1 | |||
5cf76c07e9 | |||
bc53f7e81a | |||
d6b879da67 | |||
86a68e25f7 |
@ -10,6 +10,14 @@ list yields a list of all related PRs for each release.
|
||||
|
||||
# [unreleased]
|
||||
|
||||
# [v7.1.0] 2025-01-17
|
||||
|
||||
- Bumped `tmtccmd` to v8.1.0
|
||||
|
||||
## Fixed
|
||||
|
||||
- Use new mode TM API.
|
||||
|
||||
# [v7.0.0] 2024-05-06
|
||||
|
||||
- Reworked PLOC MPSoC commanding to be inline with OBSW update.
|
||||
|
@ -62,13 +62,7 @@ class CfdpHandler:
|
||||
)
|
||||
|
||||
def put_request(self, request: PutRequest):
|
||||
if not self.remote_cfg_table.get_cfg(request.destination_id):
|
||||
raise ValueError(
|
||||
f"No remote CFDP config found for entity ID {request.destination_id}"
|
||||
)
|
||||
self.source_handler.put_request(
|
||||
request, self.remote_cfg_table.get_cfg(request.destination_id) # type: ignore
|
||||
)
|
||||
self.source_handler.put_request(request)
|
||||
|
||||
def pull_next_source_packet(self) -> Optional[PduHolder]:
|
||||
res = self.source_handler.state_machine()
|
||||
|
@ -17,6 +17,7 @@ TM_DB_PATH = "tm.db"
|
||||
|
||||
# TODO: The cleanest way would be to load those from the config file..
|
||||
PRINT_RAW_HK_B64_STR = False
|
||||
PRINT_RAW_ACTION_DATA_REPLY_B64_STR = True
|
||||
PRINT_RAW_EVENTS_B64_STR = False
|
||||
|
||||
PUS_APID = 0x65
|
||||
|
@ -52,7 +52,7 @@ from eive_tmtc.tmtc.wdt import create_wdt_node
|
||||
|
||||
class EiveHookObject(HookBase):
|
||||
def __init__(self, json_cfg_path: str):
|
||||
super().__init__(json_cfg_path=json_cfg_path)
|
||||
super().__init__(json_cfg_path)
|
||||
|
||||
def get_command_definitions(self) -> CmdTreeNode:
|
||||
root_node = CmdTreeNode.root_node()
|
||||
|
@ -4,7 +4,7 @@
|
||||
import logging
|
||||
from typing import List, cast
|
||||
|
||||
from tmtccmd import DefaultProcedureInfo
|
||||
from tmtccmd import TreeCommandingProcedure
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
|
||||
@ -78,7 +78,7 @@ from eive_tmtc.utility.input_helper import InputHelper
|
||||
|
||||
|
||||
def handle_pus_procedure(
|
||||
info: DefaultProcedureInfo,
|
||||
info: TreeCommandingProcedure,
|
||||
queue_helper: DefaultPusQueueHelper,
|
||||
):
|
||||
cmd_path = info.cmd_path
|
||||
|
@ -68,8 +68,8 @@ class TcHandler(TcHandlerBase):
|
||||
|
||||
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
|
||||
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
|
||||
if info.proc_type == TcProcedureType.DEFAULT:
|
||||
handle_pus_procedure(info.to_def_procedure(), self.queue_helper)
|
||||
if info.proc_type == TcProcedureType.TREE_COMMANDING:
|
||||
handle_pus_procedure(info.to_tree_commanding_procedure(), self.queue_helper)
|
||||
elif info.proc_type == TcProcedureType.CFDP:
|
||||
self.handle_cfdp_procedure(info)
|
||||
|
||||
@ -155,7 +155,7 @@ class TcHandler(TcHandlerBase):
|
||||
def queue_finished_cb(self, info: ProcedureWrapper):
|
||||
if info is not None:
|
||||
if info.proc_type == TcQueueEntryType.PUS_TC:
|
||||
def_proc = info.to_def_procedure()
|
||||
def_proc = info.to_tree_commanding_procedure()
|
||||
_LOGGER.info(f"Finished queue for command {def_proc.cmd_path}")
|
||||
elif info.proc_type == TcProcedureType.CFDP:
|
||||
_LOGGER.info("Finished CFDP queue")
|
||||
|
@ -1,5 +1,10 @@
|
||||
import base64
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from spacepackets.ecss import PusTm
|
||||
from tmtccmd.pus.s8_fsfw_action_defs import CustomSubservice
|
||||
from eive_tmtc.config.definitions import PRINT_RAW_ACTION_DATA_REPLY_B64_STR
|
||||
from eive_tmtc.config.object_ids import (
|
||||
ACU_HANDLER_ID,
|
||||
PDU_1_HANDLER_ID,
|
||||
@ -20,56 +25,76 @@ from eive_tmtc.tmtc.payload.ploc_supervisor import SupvActionId
|
||||
from eive_tmtc.tmtc.acs.star_tracker import handle_star_tracker_action_replies
|
||||
from eive_tmtc.tmtc.acs.acs_ctrl import handle_acs_ctrl_action_replies
|
||||
from eive_tmtc.tmtc.power.tm import handle_get_param_data_reply
|
||||
from tmtccmd.pus.s8_fsfw_action import Service8FsfwTm
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from spacepackets.ccsds.time import CdsShortTimestamp
|
||||
from tmtccmd.util import ObjectIdDictT
|
||||
from tmtccmd.util import ObjectIdBase, ObjectIdDictT, ObjectIdU32
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def handle_action_reply(
|
||||
raw_tm: bytes, printer: FsfwTmTcPrinter, obj_id_dict: ObjectIdDictT
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def handle_action_service_tm(
|
||||
raw_tm: bytes, pw: PrintWrapper, obj_id_dict: ObjectIdDictT
|
||||
):
|
||||
"""Core Action reply handler
|
||||
:return:
|
||||
"""
|
||||
tm_packet = Service8FsfwTm.unpack(
|
||||
raw_telemetry=raw_tm, time_reader=CdsShortTimestamp.empty()
|
||||
)
|
||||
object_id = obj_id_dict.get(tm_packet.source_object_id_as_bytes)
|
||||
pw = PrintWrapper(printer.file_logger)
|
||||
custom_data = tm_packet.custom_data
|
||||
action_id = tm_packet.action_id
|
||||
generic_print_str = printer.generic_action_packet_tm_print(
|
||||
packet=tm_packet, obj_id=object_id
|
||||
)
|
||||
pw.dlog(generic_print_str)
|
||||
if object_id.as_bytes == IMTQ_HANDLER_ID:
|
||||
return handle_imtq_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == PLOC_MPSOC_ID:
|
||||
return handle_mpsoc_data_reply(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == PLOC_SUPV_ID:
|
||||
return handle_supervisor_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == CORE_CONTROLLER_ID:
|
||||
return handle_core_ctrl_action_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == STAR_TRACKER_ID:
|
||||
return handle_star_tracker_action_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == ACS_CONTROLLER:
|
||||
return handle_acs_ctrl_action_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes in [
|
||||
ACU_HANDLER_ID,
|
||||
PDU_1_HANDLER_ID,
|
||||
PDU_2_HANDLER_ID,
|
||||
P60_DOCK_HANDLER,
|
||||
]:
|
||||
return handle_get_param_data_reply(object_id, action_id, pw, custom_data)
|
||||
tm_packet = PusTm.unpack(raw_tm, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE)
|
||||
if len(tm_packet.source_data) < 8:
|
||||
_LOGGER.warning(
|
||||
"received action service reply with source data smaller than 8 bytes"
|
||||
)
|
||||
return
|
||||
object_id_raw = struct.unpack("!I", tm_packet.source_data[0:4])[0]
|
||||
action_id = struct.unpack("!I", tm_packet.source_data[4:8])[0]
|
||||
object_id = obj_id_dict.get(object_id_raw)
|
||||
custom_data = tm_packet.source_data[8:]
|
||||
if object_id is None:
|
||||
object_id = ObjectIdU32(object_id_raw, "Unknown ID")
|
||||
if tm_packet.subservice == CustomSubservice.TM_DATA_REPLY:
|
||||
if PRINT_RAW_ACTION_DATA_REPLY_B64_STR:
|
||||
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
|
||||
if object_id.as_bytes == IMTQ_HANDLER_ID:
|
||||
return handle_imtq_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == PLOC_MPSOC_ID:
|
||||
return handle_mpsoc_data_reply(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == PLOC_SUPV_ID:
|
||||
return handle_supervisor_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == CORE_CONTROLLER_ID:
|
||||
return handle_core_ctrl_action_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == STAR_TRACKER_ID:
|
||||
return handle_star_tracker_action_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes == ACS_CONTROLLER:
|
||||
return handle_acs_ctrl_action_replies(action_id, pw, custom_data)
|
||||
elif object_id.as_bytes in [
|
||||
ACU_HANDLER_ID,
|
||||
PDU_1_HANDLER_ID,
|
||||
PDU_2_HANDLER_ID,
|
||||
P60_DOCK_HANDLER,
|
||||
]:
|
||||
return handle_get_param_data_reply(object_id, action_id, pw, custom_data)
|
||||
else:
|
||||
# TODO: Could add a handler here depending on action ID and object ID.
|
||||
handle_action_data_reply(tm_packet, object_id, action_id, pw)
|
||||
else:
|
||||
pw.dlog(f"No dedicated action reply handler found for reply from {object_id}")
|
||||
pw.dlog(f"Raw Data: {tm_packet.custom_data.hex(sep=',')}")
|
||||
pw.dlog(
|
||||
f"service 8 packet from {object_id} with action ID {action_id} "
|
||||
f"and unknown subservice {tm_packet.subservice}"
|
||||
)
|
||||
|
||||
|
||||
def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray):
|
||||
def handle_action_data_reply(
|
||||
tm_packet: PusTm, named_obj_id: ObjectIdBase, action_id: int, printer: PrintWrapper
|
||||
):
|
||||
print_string = (
|
||||
f"service 8 data reply from {named_obj_id} with action ID {action_id} "
|
||||
f"and data size {len(tm_packet.tm_data[8:])}"
|
||||
)
|
||||
printer.dlog(print_string)
|
||||
|
||||
|
||||
def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytes):
|
||||
if action_id == struct.unpack("!I", ImtqActionId.get_commanded_dipole)[0]:
|
||||
header_list = [
|
||||
"Commanded X-Dipole",
|
||||
@ -82,7 +107,7 @@ def handle_imtq_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray
|
||||
pw.dlog(f"{content_list}")
|
||||
|
||||
|
||||
def handle_supervisor_replies(action_id: int, pw: PrintWrapper, custom_data: bytearray):
|
||||
def handle_supervisor_replies(action_id: int, pw: PrintWrapper, custom_data: bytes):
|
||||
if action_id == SupvActionId.DUMP_MRAM:
|
||||
header_list = ["MRAM Dump"]
|
||||
content_list = [custom_data[: len(custom_data)]]
|
||||
|
@ -25,7 +25,7 @@ def handle_event_packet( # noqa C901: Complexity okay here
|
||||
): # noqa C901: Complexity okay here
|
||||
if PRINT_RAW_EVENTS_B64_STR:
|
||||
print(f"PUS Event TM Base64: {base64.b64encode(raw_tm)}")
|
||||
tm = Service5Tm.unpack(data=raw_tm, time_reader=CdsShortTimestamp.empty())
|
||||
tm = Service5Tm.unpack(data=raw_tm, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE)
|
||||
event_dict = get_event_dict()
|
||||
event_def = tm.event_definition
|
||||
info = event_dict.get(event_def.event_id)
|
||||
@ -40,10 +40,10 @@ def handle_event_packet( # noqa C901: Complexity okay here
|
||||
obj_name = event_def.reporter_id.hex(sep=",")
|
||||
else:
|
||||
obj_name = obj_id_obj.name
|
||||
assert tm.time_provider is not None
|
||||
timestamp = CdsShortTimestamp.unpack(tm.timestamp)
|
||||
generic_event_string = (
|
||||
f"Object {obj_name} generated Event {info.name} (ID: {event_def.event_id:#04x})"
|
||||
f" at {tm.time_provider.as_date_time()}"
|
||||
f" at {timestamp.as_date_time()}"
|
||||
)
|
||||
_LOGGER.info(generic_event_string)
|
||||
pw.file_logger.info(
|
||||
|
@ -2,20 +2,17 @@ import uuid
|
||||
import dataclasses
|
||||
import datetime
|
||||
import sqlite3
|
||||
from tmtccmd.pus.tm.s3_fsfw_hk import Service3FsfwTm
|
||||
from spacepackets.ecss.tm import CdsShortTimestamp, PusTm
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HkTmInfo:
|
||||
packet_uuid: uuid.UUID
|
||||
hk_packet: Service3FsfwTm
|
||||
hk_packet: PusTm
|
||||
set_id: int
|
||||
db_con: sqlite3.Connection
|
||||
hk_data: bytes
|
||||
|
||||
@property
|
||||
def packet_datetime(self) -> datetime.datetime:
|
||||
return self.hk_packet.pus_tm.time_provider.as_datetime()
|
||||
|
||||
@property
|
||||
def set_id(self) -> int:
|
||||
return self.hk_packet.set_id
|
||||
return CdsShortTimestamp.unpack(self.hk_packet.timestamp).as_datetime()
|
||||
|
@ -2,10 +2,14 @@
|
||||
|
||||
import dataclasses
|
||||
import logging
|
||||
import base64 # noqa
|
||||
import base64
|
||||
import sqlite3
|
||||
import struct
|
||||
from typing import List, cast
|
||||
from uuid import UUID
|
||||
|
||||
from spacepackets.ccsds.time import CdsShortTimestamp
|
||||
from spacepackets.ecss import PusTm
|
||||
from eive_tmtc.config.definitions import PRINT_RAW_HK_B64_STR
|
||||
from eive_tmtc.pus_tm.hk import HkTmInfo
|
||||
|
||||
@ -23,9 +27,6 @@ from eive_tmtc.tmtc.power.pwr_ctrl import handle_pwr_ctrl_hk_data
|
||||
from eive_tmtc.tmtc.com.syrlinks_handler import handle_syrlinks_hk_data
|
||||
from eive_tmtc.tmtc.tcs import handle_thermal_controller_hk_data
|
||||
from eive_tmtc.tmtc.tcs.tmp1075 import handle_tmp_1075_hk_data
|
||||
from tmtccmd.pus.tm.s3_fsfw_hk import (
|
||||
Service3FsfwTm,
|
||||
)
|
||||
from tmtccmd.pus.tm.s3_hk_base import HkContentType
|
||||
from tmtccmd.util.obj_id import ObjectIdU32, ObjectIdDictT
|
||||
|
||||
@ -68,17 +69,20 @@ def handle_hk_packet(
|
||||
hk_level: int,
|
||||
db_con: sqlite3.Connection,
|
||||
):
|
||||
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
|
||||
named_obj_id = cast(ObjectIdU32, obj_id_dict.get(tm_packet.object_id.as_bytes))
|
||||
tm_packet = PusTm.unpack(raw_tm, CdsShortTimestamp.TIMESTAMP_SIZE)
|
||||
obj_id_raw = struct.unpack("!I", tm_packet.tm_data[0:4])[0]
|
||||
named_obj_id = cast(ObjectIdU32, obj_id_dict.get(obj_id_raw))
|
||||
if named_obj_id is None:
|
||||
named_obj_id = tm_packet.object_id
|
||||
named_obj_id = ObjectIdU32(obj_id_raw, "Unknown ID")
|
||||
if tm_packet.subservice == 25 or tm_packet.subservice == 26:
|
||||
set_id = struct.unpack("!I", tm_packet.tm_data[4:8])[0]
|
||||
hk_data = tm_packet.tm_data[8:]
|
||||
if named_obj_id.as_bytes in hk_filter.object_ids:
|
||||
if PRINT_RAW_HK_B64_STR:
|
||||
print(f"PUS TM Base64: {base64.b64encode(raw_tm)}")
|
||||
handle_regular_hk_print(
|
||||
printer=printer,
|
||||
set_id=set_id,
|
||||
packet_uuid=packet_uuid,
|
||||
object_id=named_obj_id,
|
||||
hk_packet=tm_packet,
|
||||
@ -89,11 +93,12 @@ def handle_hk_packet(
|
||||
try:
|
||||
if hk_level >= 1:
|
||||
printer.generic_hk_tm_print(
|
||||
HkContentType.HK, named_obj_id, tm_packet.set_id, hk_data
|
||||
HkContentType.HK, named_obj_id, set_id, hk_data
|
||||
)
|
||||
if hk_level >= 1:
|
||||
handle_regular_hk_print(
|
||||
printer=printer,
|
||||
set_id=set_id,
|
||||
packet_uuid=packet_uuid,
|
||||
object_id=named_obj_id,
|
||||
hk_packet=tm_packet,
|
||||
@ -109,7 +114,8 @@ def handle_hk_packet(
|
||||
|
||||
|
||||
def handle_regular_hk_print( # noqa C901: Complexity okay here
|
||||
hk_packet: Service3FsfwTm,
|
||||
hk_packet: PusTm,
|
||||
set_id: int,
|
||||
packet_uuid: UUID,
|
||||
hk_data: bytes,
|
||||
db: sqlite3.Connection,
|
||||
@ -117,12 +123,15 @@ def handle_regular_hk_print( # noqa C901: Complexity okay here
|
||||
printer: FsfwTmTcPrinter,
|
||||
):
|
||||
objb = object_id.as_bytes
|
||||
set_id = hk_packet.set_id
|
||||
hk_info = HkTmInfo(
|
||||
packet_uuid=packet_uuid, hk_packet=hk_packet, db_con=db, hk_data=hk_data
|
||||
packet_uuid=packet_uuid,
|
||||
hk_packet=hk_packet,
|
||||
db_con=db,
|
||||
hk_data=hk_data,
|
||||
set_id=set_id,
|
||||
)
|
||||
assert hk_packet.pus_tm.time_provider is not None
|
||||
packet_dt = hk_packet.pus_tm.time_provider.as_date_time()
|
||||
timestamp = CdsShortTimestamp.unpack(hk_packet.timestamp)
|
||||
packet_dt = timestamp.as_datetime()
|
||||
pw = PrintWrapper(printer.file_logger)
|
||||
"""This function is called when a Service 3 Housekeeping packet is received."""
|
||||
if objb in [obj_ids.RW1_ID, obj_ids.RW2_ID, obj_ids.RW3_ID, obj_ids.RW4_ID]:
|
||||
@ -139,7 +148,7 @@ def handle_regular_hk_print( # noqa C901: Complexity okay here
|
||||
pw=pw,
|
||||
set_id=set_id,
|
||||
hk_data=hk_data,
|
||||
packet_time=packet_dt,
|
||||
packet_time=timestamp.as_datetime(),
|
||||
)
|
||||
elif objb == obj_ids.PCDU_HANDLER_ID:
|
||||
return handle_pcdu_hk(pw=pw, set_id=set_id, hk_data=hk_data)
|
||||
|
@ -15,14 +15,14 @@ from tmtccmd.logging.pus import RawTmtcTimedLogWrapper
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.pus.s20_fsfw_param import Service20FsfwTm, Service20ParamDumpWrapper
|
||||
from tmtccmd.pus.s20_fsfw_param_defs import CustomSubservice as ParamSubservice
|
||||
from tmtccmd.pus.s200_fsfw_mode import Service200FsfwTm
|
||||
from tmtccmd.pus.s200_fsfw_mode import Service200FsfwReader, Service200FsfwTm
|
||||
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservice
|
||||
from tmtccmd.tmtc import GenericApidHandlerBase, SpecificApidHandlerBase
|
||||
from eive_tmtc.config.definitions import TM_DB_PATH, PUS_APID
|
||||
|
||||
from eive_tmtc.config.object_ids import get_object_ids
|
||||
|
||||
from .action_reply_handler import handle_action_reply
|
||||
from .action_reply_handler import handle_action_service_tm
|
||||
from .defs import PrintWrapper
|
||||
from .event_handler import handle_event_packet
|
||||
from .hk_handler import HkFilter, handle_hk_packet
|
||||
@ -63,15 +63,22 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
_LOGGER.warning("Detected packet shorter than 8 bytes!")
|
||||
return
|
||||
try:
|
||||
tm_packet = PusTelemetry.unpack(packet, CdsShortTimestamp.empty())
|
||||
tm_packet = PusTelemetry.unpack(packet, CdsShortTimestamp.TIMESTAMP_SIZE)
|
||||
# _LOGGER.info(f"Sequence count: {tm_packet.seq_count}")
|
||||
except ValueError as value_error:
|
||||
_LOGGER.warning(f"{value_error}")
|
||||
_LOGGER.warning("Could not generate PUS TM object from raw data")
|
||||
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
|
||||
return
|
||||
timestamp = CdsShortTimestamp.unpack(tm_packet.timestamp)
|
||||
db_con = sqlite3.connect(TM_DB_PATH)
|
||||
self._store_packet_in_db(db_con, packet, tm_packet, packet_uuid)
|
||||
self._store_packet_in_db(
|
||||
db_con=db_con,
|
||||
packet=packet,
|
||||
tm_packet=tm_packet,
|
||||
timestamp=timestamp,
|
||||
packet_uuid=packet_uuid,
|
||||
)
|
||||
service = tm_packet.service
|
||||
dedicated_handler = True
|
||||
if service == 1:
|
||||
@ -89,12 +96,12 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
elif service == 5:
|
||||
handle_event_packet(raw_tm=packet, pw=self.pw)
|
||||
elif service == 8:
|
||||
handle_action_reply(
|
||||
raw_tm=packet, printer=self.printer, obj_id_dict=self.obj_id_dict
|
||||
handle_action_service_tm(
|
||||
raw_tm=packet, pw=self.pw, obj_id_dict=self.obj_id_dict
|
||||
)
|
||||
elif service == 17:
|
||||
pus17_tm = Service17Tm.unpack(
|
||||
data=packet, time_reader=CdsShortTimestamp.empty()
|
||||
data=packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
if pus17_tm.subservice == 2:
|
||||
self.verif_wrapper.dlog("Received Ping Reply TM[17,2]")
|
||||
@ -102,7 +109,7 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
elif service == 20:
|
||||
self._handle_param_packet(packet, tm_packet)
|
||||
elif service == 200:
|
||||
dedicated_handler = self._handle_mode_packet(packet, tm_packet)
|
||||
dedicated_handler = self._handle_mode_packet(tm_packet)
|
||||
else:
|
||||
_LOGGER.info(
|
||||
f"The service {service} is not implemented in Telemetry Factory"
|
||||
@ -119,11 +126,11 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
self,
|
||||
db_con: sqlite3.Connection,
|
||||
packet: bytes,
|
||||
timestamp: CdsShortTimestamp,
|
||||
tm_packet: PusTelemetry,
|
||||
packet_uuid: uuid.UUID,
|
||||
):
|
||||
cursor = db_con.cursor()
|
||||
assert tm_packet.time_provider is not None
|
||||
cursor.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS pus_tm(
|
||||
@ -139,7 +146,7 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
"INSERT INTO pus_tm VALUES(?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
str(packet_uuid),
|
||||
tm_packet.time_provider.as_datetime(),
|
||||
timestamp.as_datetime(),
|
||||
tm_packet.service,
|
||||
tm_packet.subservice,
|
||||
len(packet),
|
||||
@ -150,7 +157,7 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
|
||||
def _handle_param_packet(self, raw_data: bytes, tm_packet: PusTelemetry):
|
||||
param_packet = Service20FsfwTm.unpack(
|
||||
raw_telemetry=raw_data, time_reader=CdsShortTimestamp.empty()
|
||||
raw_telemetry=raw_data, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
|
||||
)
|
||||
if tm_packet.subservice == ParamSubservice.TM_DUMP_REPLY:
|
||||
param_wrapper = Service20ParamDumpWrapper(param_tm=param_packet)
|
||||
@ -186,21 +193,21 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
f"unknown subservice {tm_packet.subservice} for parameter service"
|
||||
)
|
||||
|
||||
def _handle_mode_packet(self, raw_data: bytes, _: PusTelemetry) -> bool:
|
||||
tm_packet = Service200FsfwTm.unpack(
|
||||
raw_telemetry=raw_data, time_reader=CdsShortTimestamp.empty()
|
||||
)
|
||||
if tm_packet.subservice == ModeSubservice.TM_CANT_REACH_MODE:
|
||||
def _handle_mode_packet(self, pus_tm: PusTelemetry) -> bool:
|
||||
tm_packet = Service200FsfwReader(pus_tm)
|
||||
if tm_packet.tm.subservice == ModeSubservice.TM_CANT_REACH_MODE:
|
||||
obj_id = tm_packet.object_id
|
||||
obj_id_obj = self.obj_id_dict.get(obj_id)
|
||||
retval = tm_packet.return_value
|
||||
assert retval is not None
|
||||
string_list = generic_retval_printout(retval)
|
||||
self.pw.dlog(f"Received Mode Reply from {obj_id_obj}: Can't reach mode.")
|
||||
for string in string_list:
|
||||
self.pw.dlog(f"Reason: {string}")
|
||||
return True
|
||||
if tm_packet.subservice == ModeSubservice.TM_WRONG_MODE_REPLY:
|
||||
if tm_packet.tm.subservice == ModeSubservice.TM_WRONG_MODE_REPLY:
|
||||
self.pw.dlog(f"Received Mode TM wrong mode reply, mode: {tm_packet.mode}")
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
|
@ -17,7 +17,7 @@ def handle_service_1_fsfw_packet(wrapper: VerificationWrapper, raw_tm: bytes):
|
||||
)
|
||||
# Error code with length 2 is FSFW specific
|
||||
tm_packet = Service1Tm.unpack(
|
||||
data=raw_tm, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2)
|
||||
data=raw_tm, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
|
||||
)
|
||||
fsfw_wrapper = Service1FsfwWrapper(tm_packet)
|
||||
res = wrapper.verificator.add_tm(tm_packet)
|
||||
|
@ -8,7 +8,7 @@ from socket import AF_INET
|
||||
from typing import Tuple
|
||||
|
||||
from tmtccmd.config.tmtc import CmdTreeNode
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
||||
from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd
|
||||
from tmtccmd.pus.s20_fsfw_param_defs import (
|
||||
@ -557,7 +557,7 @@ def handle_acs_ctrl_sus_raw_data(pw: PrintWrapper, hk_data: bytes):
|
||||
sus_list_formatted = vec_fmt.format(*sus_list)
|
||||
current_idx += length
|
||||
pw.dlog(f"SUS {idx} RAW: {sus_list_formatted}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=12))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=12))
|
||||
|
||||
|
||||
def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -593,7 +593,7 @@ def handle_acs_ctrl_sus_processed_data(pw: PrintWrapper, hk_data: bytes):
|
||||
sun_ijk_model = vec_fmt.format(*sun_ijk_model)
|
||||
current_idx += inc_len
|
||||
pw.dlog(f"{'SUS ijk Model'.ljust(25)}: {sun_ijk_model}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=15))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=15))
|
||||
|
||||
|
||||
def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -649,7 +649,7 @@ def handle_raw_mgm_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
|
||||
current_idx += 1
|
||||
assert current_idx == 61
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
|
||||
|
||||
|
||||
def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -703,7 +703,7 @@ def handle_mgm_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
current_idx += inc_len
|
||||
if PERFORM_MGM_CALIBRATION:
|
||||
perform_mgm_calibration(pw, mgm_3)
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=8))
|
||||
|
||||
|
||||
def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -737,7 +737,7 @@ def handle_gyr_data_raw(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"{'GYR 1 L3'.ljust(15)}: {float_str_fmt.format(*gyr_1_l3)}")
|
||||
pw.dlog(f"{'GYR 2 ADIS'.ljust(15)}: {float_str_fmt.format(*gyr_2_adis)}")
|
||||
pw.dlog(f"{'GYR 3 L3'.ljust(15)}: {float_str_fmt.format(*gyr_3_l3)}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 4))
|
||||
|
||||
|
||||
GYR_NAMES = ["GYR 0 ADIS", "GYR 1 L3", "GYR 2 ADIS", "GYR 3 L3"]
|
||||
@ -763,7 +763,7 @@ def handle_gyr_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
]
|
||||
pw.dlog(f"GYR Vec Total: {gyr_vec_tot}")
|
||||
current_idx += inc_len
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
|
||||
|
||||
|
||||
def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -826,7 +826,7 @@ def handle_gps_data_processed(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"GPS Altitude: {alt} [m]")
|
||||
pw.dlog(f"GPS Position: {pos} [m]")
|
||||
pw.dlog(f"GPS Velocity: {velo} [m/s]")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
|
||||
|
||||
|
||||
def handle_attitude_estimation_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -881,9 +881,9 @@ def handle_attitude_estimation_data(pw: PrintWrapper, hk_data: bytes):
|
||||
)
|
||||
current_idx += inc_len_quat
|
||||
pw.dlog(f"{'QUEST Quaternion'.ljust(25)}: {fmt_str_4.format(*quest_quat)}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=4))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=4))
|
||||
return
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
|
||||
|
||||
|
||||
def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -940,7 +940,7 @@ def handle_ctrl_val_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"Control Values Error Quaternion: {err_quat}")
|
||||
pw.dlog(f"Control Values Error Angle: {err_ang} [deg]")
|
||||
pw.dlog(f"Control Values Target Rotational Rate: {tgt_rot} [deg/s]")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
|
||||
|
||||
|
||||
def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -979,7 +979,7 @@ def handle_act_cmd_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"Actuator Commands RW Target Torque: {rw_tgt_torque}")
|
||||
pw.dlog(f"Actuator Commands RW Target Speed: {rw_tgt_speed}")
|
||||
pw.dlog(f"Actuator Commands MTQ Target Dipole: {mtq_tgt_dipole}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
|
||||
|
||||
|
||||
def handle_fused_rot_rate_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -1030,9 +1030,9 @@ def handle_fused_rot_rate_data(pw: PrintWrapper, hk_data: bytes):
|
||||
)
|
||||
else:
|
||||
pw.dlog(f"Ctrl Strategy (key unknown): {rot_rate_source}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=4))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=4))
|
||||
return
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
|
||||
|
||||
|
||||
def handle_fused_rot_rate_source_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -1087,7 +1087,7 @@ def handle_fused_rot_rate_source_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"Fused Rotational Rate Total SUSMGM: {rot_rate_total_susmgm} [deg/s]")
|
||||
pw.dlog(f"Fused Rotational Rate Total QUEST: {rot_rate_total_quest} [deg/s]")
|
||||
pw.dlog(f"Fused Rotational Rate Total STR: {rot_rate_total_str} [deg/s]")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
|
||||
|
||||
|
||||
def handle_acs_ctrl_action_replies(
|
||||
|
@ -14,7 +14,7 @@ from tmtccmd.pus.tc.s3_fsfw_hk import (
|
||||
create_disable_periodic_hk_command_with_diag,
|
||||
)
|
||||
from tmtccmd.pus.tc.s8_fsfw_action import create_action_cmd
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -203,9 +203,7 @@ def handle_core_data(pw: PrintWrapper, hk_data: bytes):
|
||||
)
|
||||
pw.dlog(f"GNSS Date: {date_string}")
|
||||
pw.dlog(f"Unix seconds {unix_seconds}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=14
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=14))
|
||||
|
||||
|
||||
def handle_skyview_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -265,6 +263,4 @@ def handle_skyview_data(pw: PrintWrapper, hk_data: bytes):
|
||||
used[idx],
|
||||
)
|
||||
)
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=6
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=6))
|
||||
|
@ -31,7 +31,7 @@ from tmtccmd.pus.tc.s3_fsfw_hk import (
|
||||
)
|
||||
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -543,7 +543,7 @@ def handle_dipole_set(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"Dipole Y: {dipole_y}")
|
||||
pw.dlog(f"Dipole Z: {dipole_z}")
|
||||
pw.dlog(f"Current torque duration: {current_torque_duration}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[fmt_len:], 2)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[fmt_len:], 2))
|
||||
|
||||
|
||||
def unpack_eng_hk(hk_data: bytes) -> List:
|
||||
@ -583,9 +583,7 @@ def handle_eng_set(pw: PrintWrapper, hk_data: bytes, torque_on: bool):
|
||||
for k, v in zip(ENG_HK_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
)
|
||||
|
||||
|
||||
@ -597,9 +595,7 @@ def handle_status_set(pw: PrintWrapper, hk_data: bytes):
|
||||
for k, v in zip(STATUS_HEADERS, content_list):
|
||||
pw.dlog(f"{k.ljust(30)}: {v}")
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
)
|
||||
|
||||
|
||||
@ -620,9 +616,7 @@ def handle_calibrated_mtm_measurement(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
)
|
||||
|
||||
|
||||
@ -644,9 +638,7 @@ def handle_raw_mtm_measurement(pw: PrintWrapper, hk_data: bytes, torque_status:
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
)
|
||||
|
||||
|
||||
@ -783,7 +775,5 @@ def handle_self_test_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=num_of_vars
|
||||
)
|
||||
get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=num_of_vars)
|
||||
)
|
||||
|
@ -23,7 +23,7 @@ from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode, Subservice
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
|
||||
class OpCodesDev:
|
||||
@ -301,7 +301,7 @@ def handle_rw_hk_data(
|
||||
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
|
||||
"1: High Current Mode (0.6 A)"
|
||||
)
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 5))
|
||||
if set_id == RwSetId.LAST_RESET:
|
||||
pw.dlog(
|
||||
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
|
||||
@ -390,7 +390,7 @@ def handle_rw_hk_data(
|
||||
)
|
||||
if current_idx > 0:
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
get_validity_buffer_str(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
)
|
||||
)
|
||||
|
@ -27,7 +27,7 @@ from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from eive_tmtc.config.object_ids import STR_ASSEMBLY, STAR_TRACKER_ID
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -750,12 +750,13 @@ def create_update_firmware_target_cmd(
|
||||
else:
|
||||
param_id = ParamId.FIRMWARE_TARGET
|
||||
return create_load_param_cmd(
|
||||
create_scalar_u8_parameter(
|
||||
parameter=create_scalar_u8_parameter(
|
||||
STAR_TRACKER_ID,
|
||||
0,
|
||||
param_id,
|
||||
fw_target,
|
||||
)
|
||||
),
|
||||
apid=0,
|
||||
)
|
||||
|
||||
|
||||
@ -930,7 +931,7 @@ def handle_version_set(hk_data: bytes, pw: PrintWrapper):
|
||||
minor = struct.unpack("!B", hk_data[current_idx : current_idx + 1])[0]
|
||||
pw.dlog(f"Minor: {minor}")
|
||||
current_idx += 1
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=5))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=5))
|
||||
|
||||
|
||||
def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -947,7 +948,7 @@ def handle_temperature_set(hk_data: bytes, pw: PrintWrapper):
|
||||
pw.dlog(f"CMOS Temperature: {cmos_temp}")
|
||||
pw.dlog(f"FPGA Temperature: {fpga_temp}")
|
||||
current_idx += fmt_len
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 5)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 5))
|
||||
|
||||
|
||||
def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1021,7 +1022,7 @@ def handle_solution_set(hk_data: bytes, pw: PrintWrapper):
|
||||
solution_strategy = hk_data[current_idx]
|
||||
pw.dlog(f"Solution strategy: {solution_strategy}")
|
||||
current_idx += 1
|
||||
print(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 23))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 23))
|
||||
|
||||
|
||||
def handle_blob_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1032,7 +1033,7 @@ def handle_blob_set(hk_data: bytes, pw: PrintWrapper):
|
||||
current_idx = unpack_time_hk(hk_data, 0, pw)
|
||||
blob_count = struct.unpack("!I", hk_data[current_idx : current_idx + 4])[0]
|
||||
pw.dlog(f"Blob count: {blob_count}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx + 4 :], num_vars=3)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx + 4 :], num_vars=3))
|
||||
|
||||
|
||||
def handle_blobs_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1060,7 +1061,7 @@ def handle_blobs_set(hk_data: bytes, pw: PrintWrapper):
|
||||
for idx in range(8):
|
||||
pw.dlog("{:<8} {:<8}".format(x_coords[idx], y_coords[idx]))
|
||||
assert current_idx == len(hk_data) - 1
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=7)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=7))
|
||||
|
||||
|
||||
def handle_centroid_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1074,7 +1075,7 @@ def handle_centroid_set(hk_data: bytes, pw: PrintWrapper):
|
||||
current_idx += 4
|
||||
pw.dlog(f"Centroid count: {centroid_count}")
|
||||
assert current_idx == len(hk_data) - 1
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
|
||||
|
||||
|
||||
def handle_centroids_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1101,7 +1102,7 @@ def handle_centroids_set(hk_data: bytes, pw: PrintWrapper):
|
||||
)
|
||||
)
|
||||
assert current_idx == len(hk_data) - 1
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
|
||||
|
||||
|
||||
def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1145,7 +1146,7 @@ def handle_matched_centroids_set(hk_data: bytes, pw: PrintWrapper):
|
||||
)
|
||||
)
|
||||
assert current_idx == len(hk_data) - 1
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=8)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=8))
|
||||
|
||||
|
||||
def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1163,7 +1164,7 @@ def handle_auto_blob_set(hk_data: bytes, pw: PrintWrapper):
|
||||
current_idx += inc_len
|
||||
assert current_idx == len(hk_data) - 1
|
||||
pw.dlog(f"Threshold {threshold}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
|
||||
|
||||
|
||||
def handle_histo_or_contrast_set(name: str, hk_data: bytes, pw: PrintWrapper):
|
||||
@ -1239,7 +1240,7 @@ def handle_blob_stats_set(hk_data: bytes, pw: PrintWrapper):
|
||||
i, noise_list[i], threshold_list[i], lvalid_list[i], oflow_list[i]
|
||||
)
|
||||
)
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=6))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=6))
|
||||
|
||||
|
||||
def handle_star_tracker_action_replies(
|
||||
|
@ -3,7 +3,7 @@ import struct
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
|
||||
class SetId(enum.IntEnum):
|
||||
@ -26,6 +26,6 @@ def handle_sus_hk(
|
||||
pw.dlog("AIN Channel | Raw Value (hex) | Raw Value (dec)")
|
||||
for idx, val in enumerate(channels):
|
||||
pw.dlog(f"{idx} | {val[0]:#06x} |" + str(val[0]).ljust(5))
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=7
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=7)
|
||||
)
|
||||
|
@ -78,12 +78,13 @@ def pack_pdec_handler_commands(
|
||||
pw = int(input("Specify positive window to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_u8_parameter(
|
||||
parameter=create_scalar_u8_parameter(
|
||||
object_id,
|
||||
0,
|
||||
ParameterId.POSITIVE_WINDOW,
|
||||
pw,
|
||||
)
|
||||
),
|
||||
apid=0,
|
||||
)
|
||||
)
|
||||
if cmd_str == OpCode.NEGATIVE_WINDOW:
|
||||
@ -91,12 +92,13 @@ def pack_pdec_handler_commands(
|
||||
nw = int(input("Specify negative window to set: "))
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_u8_parameter(
|
||||
parameter=create_scalar_u8_parameter(
|
||||
object_id,
|
||||
0,
|
||||
ParameterId.NEGATIVE_WINDOW,
|
||||
nw,
|
||||
)
|
||||
),
|
||||
apid=0,
|
||||
)
|
||||
)
|
||||
if cmd_str == OpCode.RESET_NO_INIT:
|
||||
|
@ -17,7 +17,7 @@ from tmtccmd.config.tmtc import (
|
||||
TmtcDefinitionWrapper,
|
||||
tmtc_definitions_provider,
|
||||
)
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, create_mode_command
|
||||
from tmtccmd.pus.tc.s3_fsfw_hk import (
|
||||
create_disable_periodic_hk_command_with_diag,
|
||||
@ -297,7 +297,7 @@ def handle_syrlinks_temp_dataset(hk_info: HkTmInfo, pw: PrintWrapper):
|
||||
temp_baseband_board = struct.unpack("!f", hk_data[4:8])[0]
|
||||
pw.dlog(f"Temperatur Power Amplifier [C]: {temp_power_amplifier}")
|
||||
pw.dlog(f"Temperatur Baseband Board [C]: {temp_baseband_board}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[8:], 2))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[8:], 2))
|
||||
|
||||
|
||||
def handle_syrlinks_rx_registers_dataset(
|
||||
@ -364,9 +364,7 @@ def handle_syrlinks_rx_registers_dataset(
|
||||
validity_buffer = hk_data[22:]
|
||||
for header, content in zip(header_list, content_list):
|
||||
pw.dlog(f"{header}: {content}")
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=8)
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=8))
|
||||
print(f"Carrier Detect: {carrier_detect}")
|
||||
print(f"Carrier Lock: {carrier_lock}")
|
||||
print(f"Data Lock (data clock recovery loop lock status): {data_lock}")
|
||||
@ -394,7 +392,7 @@ def handle_syrlinks_rx_registers_dataset(
|
||||
"INSERT INTO syrlinks_rx_regs VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
str(hk_info.packet_uuid),
|
||||
hk_info.hk_packet.pus_tm.time_provider.as_datetime(), # type: ignore
|
||||
hk_info.packet_datetime,
|
||||
carrier_detect,
|
||||
carrier_lock,
|
||||
data_lock,
|
||||
@ -470,9 +468,7 @@ def handle_syrlinks_tx_registers_dataset(
|
||||
validity_buffer = hk_info.hk_data[4:]
|
||||
for header, content in zip(header_list, content_list):
|
||||
pw.dlog(f"{header}: {content}")
|
||||
pw.dlog(
|
||||
FsfwTmTcPrinter.get_validity_buffer(validity_buffer=validity_buffer, num_vars=3)
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=3))
|
||||
# pw.dlog(f"TX CONV: {tx_conv!r}")
|
||||
# pw.dlog(f"TX DIFF (differential encoder enable): {tx_diff_encoder_enable}")
|
||||
print(f"TX Status: {tx_status_status!r}")
|
||||
@ -504,7 +500,7 @@ def handle_syrlinks_tx_registers_dataset(
|
||||
"INSERT INTO syrlinks_tx_regs VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
str(hk_info.packet_uuid),
|
||||
hk_info.hk_packet.pus_tm.time_provider.as_datetime(), # type: ignore
|
||||
hk_info.packet_datetime,
|
||||
tx_status_status,
|
||||
tx_status_status.name,
|
||||
tx_conf_set,
|
||||
|
@ -15,7 +15,7 @@ from tmtccmd.pus.s20_fsfw_param import (
|
||||
create_scalar_u8_parameter,
|
||||
create_load_param_cmd,
|
||||
)
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from tmtccmd.pus.s11_tc_sched import (
|
||||
create_enable_tc_sched_cmd,
|
||||
create_disable_tc_sched_cmd,
|
||||
@ -399,12 +399,13 @@ def pack_core_commands( # noqa C901
|
||||
raise ValueError("Only 0 or 1 allowed for preferred SD card")
|
||||
q.add_pus_tc(
|
||||
create_load_param_cmd(
|
||||
create_scalar_u8_parameter(
|
||||
parameter=create_scalar_u8_parameter(
|
||||
object_id=CORE_CONTROLLER_ID,
|
||||
domain_id=0,
|
||||
unique_id=ParamId.PREF_SD,
|
||||
parameter=pref_sd,
|
||||
)
|
||||
),
|
||||
apid=0,
|
||||
)
|
||||
)
|
||||
elif cmd_str == OpCode.CP_HELPER:
|
||||
@ -632,9 +633,7 @@ def handle_core_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
f"PL Voltage [mV] {pl_voltage}"
|
||||
)
|
||||
pw.dlog(printout)
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[inc_len:], num_vars=3
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=hk_data[inc_len:], num_vars=3))
|
||||
|
||||
|
||||
def handle_core_ctrl_action_replies(
|
||||
|
@ -1,9 +1,8 @@
|
||||
import enum
|
||||
import struct
|
||||
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
|
||||
class SetId(enum.IntEnum):
|
||||
@ -21,4 +20,4 @@ def handle_ier_hk_data(pw: PrintWrapper, hk_data: bytes, set_id: int):
|
||||
pw.dlog(f"TM Errors: {tm_errors}")
|
||||
pw.dlog(f"Queue Errors: {queue_errors}")
|
||||
pw.dlog(f"Store Errors: {store_hits}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[inc_len:], 3))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[inc_len:], 3))
|
||||
|
@ -717,7 +717,7 @@ class DirElement:
|
||||
size: int
|
||||
|
||||
|
||||
def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytearray):
|
||||
def handle_mpsoc_data_reply(action_id: int, pw: PrintWrapper, custom_data: bytes):
|
||||
if action_id == ActionId.TM_MEM_READ_RPT:
|
||||
header_list = [
|
||||
"PLOC Memory Address",
|
||||
|
@ -19,7 +19,7 @@ from tmtccmd.config.tmtc import CmdTreeNode
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.pus.s200_fsfw_mode import pack_mode_data, Mode
|
||||
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from eive_tmtc.utility.input_helper import InputHelper
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -701,7 +701,7 @@ def handle_hk_report(hk_data: bytes, pw: PrintWrapper):
|
||||
pw.dlog(f"NVM 3 State {nvm_3_state}")
|
||||
pw.dlog(f"Mission IO state {mission_io_state}")
|
||||
pw.dlog(f"FMC state {fmc_state}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[inc_len:], 13))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[inc_len:], 13))
|
||||
|
||||
|
||||
def handle_boot_report(hk_data: bytes, pw: PrintWrapper):
|
||||
@ -730,7 +730,7 @@ def handle_boot_report(hk_data: bytes, pw: PrintWrapper):
|
||||
pw.dlog(f"Active NVM: {active_nvm}")
|
||||
pw.dlog(f"BP0 State {bp_0_state} | BP1 State {bp_1_state} | BP2 State {bp_2_state}")
|
||||
pw.dlog(f"Boot State {boot_state} | Boot Cycles {boot_cycles}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 10)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 10))
|
||||
|
||||
|
||||
def handle_adc_report(hk_data: bytes):
|
||||
|
@ -28,7 +28,7 @@ from tmtccmd.pus.s20_fsfw_param import (
|
||||
)
|
||||
from spacepackets.ecss.tc import PusTelecommand
|
||||
from eive_tmtc.config.object_ids import PL_PCDU_ID
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -615,8 +615,8 @@ def handle_plpcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
pw.dlog(ch_print)
|
||||
for i in range(12):
|
||||
pw.dlog(f"{ADC_CHANNELS_NAMED[i].ljust(24)} | {processed_vals[i]}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=3
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=3)
|
||||
)
|
||||
|
||||
|
||||
|
@ -18,7 +18,7 @@ from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
|
||||
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
|
||||
from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
|
||||
class SetId(enum.IntEnum):
|
||||
@ -115,6 +115,6 @@ def handle_rad_sensor_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
for idx, val in ain_dict.items():
|
||||
pw.dlog(f"{idx} | {val:#06x} | {str(val).ljust(5)}")
|
||||
current_idx += inc_len
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=7
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=7)
|
||||
)
|
||||
|
@ -82,7 +82,7 @@ def pack_acu_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, cmd_str:
|
||||
|
||||
|
||||
def acu_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
req_hk_cmds("ACU", q, op_code, ACU_HANDLER_ID, [SetId.CORE, SetId.AUX])
|
||||
req_hk_cmds("ACU", q, op_code, ACU_HANDLER_ID, (SetId.CORE, SetId.AUX))
|
||||
|
||||
|
||||
class ACUTestProcedure:
|
||||
|
@ -5,7 +5,7 @@ from spacepackets.ecss import PusTelecommand
|
||||
from tmtccmd.config.tmtc import (
|
||||
CmdTreeNode,
|
||||
)
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
|
||||
from tmtccmd.pus.s200_fsfw_mode import Subservice as ModeSubservices
|
||||
@ -213,9 +213,7 @@ def handle_bpx_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
validity_buffer = hk_data[inc_len:]
|
||||
pw.dlog(str(HEADER_LIST))
|
||||
pw.dlog(str(content_list))
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=10
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=10))
|
||||
elif set_id == BpxSetId.GET_CFG_SET:
|
||||
battheat_mode = hk_data[0]
|
||||
battheat_low = struct.unpack("!b", hk_data[1:2])[0]
|
||||
@ -229,6 +227,4 @@ def handle_bpx_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
validity_buffer = hk_data[3:]
|
||||
pw.dlog(str(header_list))
|
||||
pw.dlog(str(content_list))
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=validity_buffer, num_vars=10
|
||||
)
|
||||
pw.dlog(get_validity_buffer_str(validity_buffer=validity_buffer, num_vars=10))
|
||||
|
@ -259,4 +259,4 @@ def create_p60_dock_node() -> CmdTreeNode:
|
||||
|
||||
|
||||
def p60_dock_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
req_hk_cmds("P60 Dock", q, op_code, P60_DOCK_HANDLER, [SetId.CORE, SetId.AUX])
|
||||
req_hk_cmds("P60 Dock", q, op_code, P60_DOCK_HANDLER, (SetId.CORE, SetId.AUX))
|
||||
|
@ -98,7 +98,7 @@ def pack_pdu1_commands(object_id: ObjectIdU32, q: DefaultPusQueueHelper, cmd_str
|
||||
|
||||
|
||||
def pdu1_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
req_hk_cmds("PDU1", q, op_code, PDU_1_HANDLER_ID, [SetId.CORE, SetId.AUX])
|
||||
req_hk_cmds("PDU1", q, op_code, PDU_1_HANDLER_ID, (SetId.CORE, SetId.AUX))
|
||||
|
||||
|
||||
def info_on_pdu1(base: str) -> str:
|
||||
|
@ -332,7 +332,7 @@ def add_pdu2_common_defs(oce: OpCodeEntry):
|
||||
|
||||
|
||||
def pdu2_req_hk_cmds(q: DefaultPusQueueHelper, op_code: str):
|
||||
req_hk_cmds("PDU2", q, op_code, PDU_2_HANDLER_ID, [SetId.CORE, SetId.AUX])
|
||||
req_hk_cmds("PDU2", q, op_code, PDU_2_HANDLER_ID, (SetId.CORE, SetId.AUX))
|
||||
|
||||
|
||||
def pl_pcdu_bat_nom_on_cmd(q: DefaultPusQueueHelper):
|
||||
|
@ -6,7 +6,7 @@ import struct
|
||||
from tmtccmd.config.tmtc import (
|
||||
CmdTreeNode,
|
||||
)
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from tmtccmd.pus.s20_fsfw_param import create_load_param_cmd
|
||||
from tmtccmd.pus.s20_fsfw_param_defs import (
|
||||
create_scalar_double_parameter,
|
||||
@ -268,7 +268,7 @@ def handle_core_hk_data(pw: PrintWrapper, hk_data: bytes):
|
||||
pw.dlog(f"Total Battery Current: {total_battery_current} [mA]")
|
||||
pw.dlog(f"Open Circuit Voltage Charge: {open_circuit_voltage_charge*100:8.3f} [%]")
|
||||
pw.dlog(f"Coulomb Counter Charge: {coulomb_counter_charge*100:8.3f} [%]")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=3)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=3))
|
||||
|
||||
|
||||
def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes):
|
||||
@ -284,4 +284,4 @@ def handle_enable_pl_data(pw: PrintWrapper, hk_data: bytes):
|
||||
)[0]
|
||||
current_idx += inc_len_uint16
|
||||
pw.dlog(f"PL Use Allowed: {pl_use_allowed}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], num_vars=1)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], num_vars=1))
|
||||
|
@ -13,7 +13,7 @@ from eive_tmtc.tmtc.power.common_power import (
|
||||
)
|
||||
from eive_tmtc.tmtc.power.power import PcduSetIds
|
||||
from tmtccmd.util import ObjectIdBase
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter, get_validity_buffer_str
|
||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||
from eive_tmtc.gomspace.gomspace_common import GomspaceDeviceActionId
|
||||
from eive_tmtc.config.object_ids import (
|
||||
@ -125,6 +125,8 @@ class DevicesInfoParser:
|
||||
return current_idx
|
||||
|
||||
def print(self, pw: PrintWrapper):
|
||||
if self.dev_types is None or self.dev_statuses is None:
|
||||
return
|
||||
pw.dlog("Device Type | Device State (0:None | 1:OK | 3:ERROR | 4:NOT FOUND)")
|
||||
for i in range(len(self.dev_types)):
|
||||
pw.dlog(
|
||||
@ -372,8 +374,8 @@ def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
temps = f"In C: Temp 0 {temp_0} | Temp 1 {temp_1} | "
|
||||
pw.dlog(temps)
|
||||
pw.dlog(batt_info)
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=9
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=9)
|
||||
)
|
||||
if set_id == SetId.AUX:
|
||||
pw.dlog("Received P60 AUX HK. Voltages in mV, currents in mA")
|
||||
@ -440,8 +442,8 @@ def handle_p60_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
"6:TempSens(BatPack)|7:TempSens(BatPack)"
|
||||
)
|
||||
dev_parser.print(pw=pw)
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=27
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=27)
|
||||
)
|
||||
|
||||
|
||||
@ -493,8 +495,8 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
f"Boot Count {bootcnt} | Uptime {uptime} sec | "
|
||||
f"MPPT Time {mppt_time} msec | MPPT Period {mppt_period} msec"
|
||||
)
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=12
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=12)
|
||||
)
|
||||
if set_id == SetId.AUX:
|
||||
current_idx = 0
|
||||
@ -531,13 +533,13 @@ def handle_acu_hk_data(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
"ACU Dev Types: 0:FRAM|1:ADC|2:ADC|3:DAC|4:DAC|5:DAC|6:TempSens|7:Reserved"
|
||||
)
|
||||
dev_parser.print(pw=pw)
|
||||
FsfwTmTcPrinter.get_validity_buffer(
|
||||
validity_buffer=hk_data[current_idx:], num_vars=8
|
||||
pw.dlog(
|
||||
get_validity_buffer_str(validity_buffer=hk_data[current_idx:], num_vars=8)
|
||||
)
|
||||
|
||||
|
||||
def handle_get_param_data_reply(
|
||||
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytearray
|
||||
obj_id: ObjectIdBase, action_id: int, pw: PrintWrapper, custom_data: bytes
|
||||
):
|
||||
if action_id == GomspaceDeviceActionId.PARAM_GET:
|
||||
pw.dlog(f"Parameter Get Request received for object {obj_id}")
|
||||
@ -677,4 +679,4 @@ def handle_pcdu_hk(pw: PrintWrapper, set_id: int, hk_data: bytes):
|
||||
pw.dlog(f"{name.ljust(25)}: {val}")
|
||||
pw.dlog(f"{'P60 Dock 5V Stack'.ljust(25)}: {p60_stack_5v_val}")
|
||||
pw.dlog(f"{'P60 Dock 3V3 Stack'.ljust(25)}: {p60_stack_3v3_val}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[current_idx:], 4)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[current_idx:], 4))
|
||||
|
@ -13,7 +13,7 @@ from tmtccmd.util import ObjectIdU32
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data, Subservice
|
||||
import eive_tmtc.config.object_ids as oids
|
||||
from eive_tmtc.config.object_ids import get_object_ids
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
|
||||
RTD_IDS = [
|
||||
oids.RTD_0_PLOC_HSPD,
|
||||
@ -170,7 +170,7 @@ def handle_rtd_hk(object_id: bytes, hk_data: bytes, pw: PrintWrapper):
|
||||
pw.dlog(f"RTD Value: {rtd_val}")
|
||||
pw.dlog(f"Error Byte: {error_byte}")
|
||||
pw.dlog(f"Last Error Byte: {last_err_byte}")
|
||||
FsfwTmTcPrinter.get_validity_buffer(hk_data[fmt_len:], 4)
|
||||
pw.dlog(get_validity_buffer_str(hk_data[fmt_len:], 4))
|
||||
|
||||
|
||||
def prompt_rtd_idx():
|
||||
|
@ -18,7 +18,7 @@ from tmtccmd.config.tmtc import (
|
||||
TmtcDefinitionWrapper,
|
||||
OpCodeEntry,
|
||||
)
|
||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||
from tmtccmd.fsfw.tmtc_printer import get_validity_buffer_str
|
||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||
from tmtccmd.pus.s200_fsfw_mode import Mode, pack_mode_data
|
||||
from tmtccmd.pus.tc.s3_fsfw_hk import create_request_one_hk_command, make_sid
|
||||
@ -96,4 +96,4 @@ def handle_tmp_1075_hk_data(set_id: int, hk_data: bytes, pw: PrintWrapper):
|
||||
if set_id == SetId.TEMPERATURE:
|
||||
temp = struct.unpack("!f", hk_data[0:4])[0]
|
||||
pw.dlog(f"TMP1075 Temperature: {temp}")
|
||||
pw.dlog(FsfwTmTcPrinter.get_validity_buffer(hk_data[4:], 1))
|
||||
pw.dlog(get_validity_buffer_str(hk_data[4:], 1))
|
||||
|
@ -6,32 +6,32 @@ build-backend = "setuptools.build_meta"
|
||||
name = "eive-tmtc"
|
||||
description = "TMTC Commander EIVE"
|
||||
readme = "README.md"
|
||||
version = "7.0.0"
|
||||
version = "7.2.0"
|
||||
requires-python = ">=3.10"
|
||||
license = {text = "Apache-2.0"}
|
||||
authors = [
|
||||
{name = "Robin Mueller", email = "muellerr@irs.uni-stuttgart.de"},
|
||||
{name = "Jakob Meier", email = "meierj@irs.uni-stuttgart.de"},
|
||||
{name = "Robin Mueller", email = "muellerr@irs.uni-stuttgart.de"},
|
||||
{name = "Jakob Meier", email = "meierj@irs.uni-stuttgart.de"},
|
||||
]
|
||||
keywords = ["eive", "space", "communication", "commanding"]
|
||||
classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Natural Language :: English",
|
||||
"Operating System :: POSIX",
|
||||
"Operating System :: Microsoft :: Windows",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Topic :: Communications",
|
||||
"Topic :: Software Development :: Libraries",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
"Topic :: Scientific/Engineering"
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Natural Language :: English",
|
||||
"Operating System :: POSIX",
|
||||
"Operating System :: Microsoft :: Windows",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Topic :: Communications",
|
||||
"Topic :: Software Development :: Libraries",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
"Topic :: Scientific/Engineering"
|
||||
]
|
||||
dependencies = [
|
||||
"tmtccmd == 8.0.0rc1",
|
||||
"cfdp-py~=0.1.0",
|
||||
"tmtccmd ~= 8.1",
|
||||
# "tmtccmd @ git+https://github.com/robamu-org/tmtccmd@main",
|
||||
"cfdp-py>=0.4, <=0.5",
|
||||
"python-dateutil ~= 2.8",
|
||||
]
|
||||
|
||||
@ -45,7 +45,7 @@ include-package-data = true
|
||||
[tool.setuptools.packages]
|
||||
find = {}
|
||||
|
||||
[tool.ruff]
|
||||
[tool.ruff.lint]
|
||||
ignore = ["E501"]
|
||||
[tool.ruff.extend-per-file-ignores]
|
||||
[tool.ruff.lint.extend-per-file-ignores]
|
||||
"__init__.py" = ["F401"]
|
||||
|
4
tmtcc.py
4
tmtcc.py
@ -97,8 +97,8 @@ def setup_params() -> Tuple[SetupWrapper, int]:
|
||||
hk_level = int(post_arg_parsing_wrapper.args_raw.hk)
|
||||
else:
|
||||
hk_level = 0
|
||||
if params.app_params.print_tree:
|
||||
perform_tree_printout(params.app_params, hook_obj.get_command_definitions())
|
||||
if params.cmd_params.print_tree:
|
||||
perform_tree_printout(params.cmd_params, hook_obj.get_command_definitions())
|
||||
sys.exit(0)
|
||||
params.apid = PUS_APID
|
||||
if params.com_if is None:
|
||||
|
Loading…
x
Reference in New Issue
Block a user