start switching to defmt

This commit is contained in:
2025-04-22 14:58:07 +02:00
parent 1a5670b362
commit 3b5e7a9af3
35 changed files with 317 additions and 240 deletions

View File

@ -2,16 +2,15 @@
from typing import List, Tuple
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.tm import PusTm
from tmtccmd.com import ComInterface
from com_interface import ComInterface
import toml
import struct
import logging
import argparse
import time
import enum
from tmtccmd.com.serial_base import SerialCfg
from tmtccmd.com.serial_cobs import SerialCobsComIF
from tmtccmd.com.ser_utils import prompt_com_port
from com_interface.serial_base import SerialCfg
from com_interface.serial_cobs import SerialCobsComIF
from crcmod.predefined import PredefinedCrc
from spacepackets.ecss.tc import PusTc
from spacepackets.ecss.pus_verificator import PusVerificator, StatusField
@ -58,6 +57,7 @@ PING_PAYLOAD_SIZE = 0
class ActionId(enum.IntEnum):
CORRUPT_APP_A = 128
CORRUPT_APP_B = 129
SET_BOOT_SLOT = 130
_LOGGER = logging.getLogger(__name__)
@ -78,11 +78,29 @@ class Target(enum.Enum):
APP_B = 2
class AppSel(enum.IntEnum):
APP_A = 0
APP_B = 1
class ImageLoader:
def __init__(self, com_if: ComInterface, verificator: PusVerificator) -> None:
self.com_if = com_if
self.verificator = verificator
def handle_boot_sel_cmd(self, target: AppSel):
_LOGGER.info("Sending ping command")
action_tc = PusTc(
apid=0x00,
service=PusService.S8_FUNC_CMD,
subservice=ActionId.SET_BOOT_SLOT,
seq_count=SEQ_PROVIDER.get_and_increment(),
app_data=bytes([target]),
)
self.verificator.add_tc(action_tc)
self.com_if.send(bytes(action_tc.pack()))
self.await_for_command_copletion("boot image selection command")
def handle_ping_cmd(self):
_LOGGER.info("Sending ping command")
ping_tc = PusTc(
@ -94,19 +112,9 @@ class ImageLoader:
)
self.verificator.add_tc(ping_tc)
self.com_if.send(bytes(ping_tc.pack()))
data_available = self.com_if.data_available(0.4)
if not data_available:
_LOGGER.warning("no ping reply received")
for reply in self.com_if.receive():
result = self.verificator.add_tm(
Service1Tm.from_tm(PusTm.unpack(reply, 0), UnpackParams(0))
)
if result is not None and result.completed:
_LOGGER.info("received ping completion reply")
self.await_for_command_copletion("ping command")
def handle_corruption_cmd(self, target: Target):
if target == Target.BOOTLOADER:
_LOGGER.error("can not corrupt bootloader")
if target == Target.APP_A:
@ -126,6 +134,25 @@ class ImageLoader:
),
)
def await_for_command_copletion(self, context: str):
done = False
now = time.time()
while time.time() - now < 2.0:
if not self.com_if.data_available():
time.sleep(0.2)
continue
for reply in self.com_if.receive():
result = self.verificator.add_tm(
Service1Tm.from_tm(PusTm.unpack(reply, 0), UnpackParams(0))
)
if result is not None and result.completed:
_LOGGER.info(f"received {context} reply")
done = True
if done:
break
if not done:
_LOGGER.warning(f"no {context} reply received")
def handle_flash_cmd(self, target: Target, file_path: Path) -> int:
loadable_segments = []
_LOGGER.info("Parsing ELF file for loadable sections")
@ -269,12 +296,12 @@ def main() -> int:
if "serial_port" in parsed_toml:
serial_port = parsed_toml["serial_port"]
if serial_port is None:
serial_port = prompt_com_port()
serial_port = input("Please specify the serial port manually: ")
serial_cfg = SerialCfg(
com_if_id="ser_cobs",
serial_port=serial_port,
baud_rate=BAUD_RATE,
serial_timeout=0.1,
polling_frequency=0.1,
)
verificator = PusVerificator()
com_if = SerialCobsComIF(serial_cfg)