bump pytmtc for embedded examples
All checks were successful
Rust/sat-rs/pipeline/head This commit looks good

This commit is contained in:
Robin Müller 2024-05-19 21:53:28 +02:00
parent fd2a8f3e99
commit 809de2d0d5
Signed by: muellerr
GPG Key ID: A649FB78196E3849
2 changed files with 16 additions and 15 deletions

View File

@ -9,7 +9,7 @@ from prompt_toolkit.history import FileHistory, History
from spacepackets.ecss.tm import CdsShortTimestamp
import tmtccmd
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusTm, PusVerificator
from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
@ -43,7 +43,7 @@ from tmtccmd.tmtc import (
DefaultPusQueueHelper,
)
from tmtccmd.pus.s5_fsfw_event import Service5Tm
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
_LOGGER = logging.getLogger()
@ -53,7 +53,7 @@ EXAMPLE_PUS_APID = 0x02
class SatRsConfigHook(HookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
super().__init__(json_cfg_path)
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import (
@ -111,9 +111,10 @@ class PusHandler(SpecificApidHandlerBase):
self.verif_wrapper = verif_wrapper
def handle_tm(self, packet: bytes, _user_args: Any):
time_reader = CdsShortTimestamp.empty()
try:
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
pus_tm = PusTm.unpack(
packet, timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE
)
except ValueError as e:
_LOGGER.warning("Could not generate PUS TM object from raw data")
_LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
@ -122,7 +123,7 @@ class PusHandler(SpecificApidHandlerBase):
tm_packet = None
if service == 1:
tm_packet = Service1Tm.unpack(
data=packet, params=UnpackParams(time_reader, 1, 2)
data=packet, params=UnpackParams(CdsShortTimestamp.TIMESTAMP_SIZE, 1, 2)
)
res = self.verif_wrapper.add_tm(tm_packet)
if res is None:
@ -139,16 +140,16 @@ class PusHandler(SpecificApidHandlerBase):
if service == 3:
_LOGGER.info("No handling for HK packets implemented")
_LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(packet, CdsShortTimestamp.empty())
pus_tm = PusTelemetry.unpack(packet, CdsShortTimestamp.TIMESTAMP_SIZE)
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
json_str = pus_tm.source_data[8:]
_LOGGER.info("received JSON string: " + json_str.decode("utf-8"))
if service == 5:
tm_packet = Service5Tm.unpack(packet, time_reader)
tm_packet = Service5Tm.unpack(packet, CdsShortTimestamp.TIMESTAMP_SIZE)
if service == 17:
tm_packet = Service17Tm.unpack(packet, time_reader)
tm_packet = Service17Tm.unpack(packet, CdsShortTimestamp.TIMESTAMP_SIZE)
if tm_packet.subservice == 2:
_LOGGER.info("Received Ping Reply TM[17,2]")
else:
@ -159,7 +160,7 @@ class PusHandler(SpecificApidHandlerBase):
_LOGGER.info(
f"The service {service} is not implemented in Telemetry Factory"
)
tm_packet = PusTelemetry.unpack(packet, time_reader)
tm_packet = PusTelemetry.unpack(packet, CdsShortTimestamp.TIMESTAMP_SIZE)
self.raw_logger.log_tm(pus_tm)
@ -203,15 +204,15 @@ class TcHandler(TcHandlerBase):
_LOGGER.info(log_entry.log_str)
def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
q = self.queue_helper
q.queue_wrapper = wrapper.queue_wrapper
if info.proc_type == TcProcedureType.DEFAULT:
def_proc = info.to_def_procedure()
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
cmd_path = def_proc.cmd_path
if cmd_path == "/ping":
q.add_log_cmd("Sending PUS ping telecommand")

View File

@ -1,2 +1,2 @@
tmtccmd == 8.0.0rc.0
tmtccmd == 8.0.1
# -e git+https://github.com/robamu-org/tmtccmd.git@main#egg=tmtccmd