update flashloader

This commit is contained in:
Robin Müller 2025-04-17 19:59:09 +02:00
parent 71fa3c5820
commit 0df05d8fba
Signed by: muellerr
GPG Key ID: A649FB78196E3849
5 changed files with 32 additions and 32 deletions

View File

@ -2,16 +2,15 @@
from typing import List, Tuple
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.tm import PusTm
from tmtccmd.com import ComInterface
import toml
import struct
import logging
import argparse
import time
import enum
from tmtccmd.com.serial_base import SerialCfg
from tmtccmd.com.serial_cobs import SerialCobsComIF
from tmtccmd.com.ser_utils import prompt_com_port
from com_interface import ComInterface
from com_interface.serial_base import SerialCfg
from com_interface.serial_cobs import SerialCobsComIF
from crcmod.predefined import PredefinedCrc
from spacepackets.ecss.tc import PusTc
from spacepackets.ecss.pus_verificator import PusVerificator, StatusField
@ -101,15 +100,7 @@ class ImageLoader:
)
self.verificator.add_tc(action_tc)
self.com_if.send(bytes(action_tc.pack()))
data_available = self.com_if.data_available(0.4)
if not data_available:
_LOGGER.warning("no reply received for boot image selection command")
for reply in self.com_if.receive():
result = self.verificator.add_tm(
Service1Tm.from_tm(PusTm.unpack(reply, 0), UnpackParams(0))
)
if result is not None and result.completed:
_LOGGER.info("received boot image selection command confirmation")
self.await_for_command_copletion("boot image selection command")
def handle_ping_cmd(self):
_LOGGER.info("Sending ping command")
@ -122,16 +113,26 @@ class ImageLoader:
)
self.verificator.add_tc(ping_tc)
self.com_if.send(bytes(ping_tc.pack()))
self.await_for_command_copletion("ping command")
data_available = self.com_if.data_available(0.4)
if not data_available:
_LOGGER.warning("no ping reply received")
def await_for_command_copletion(self, context: str):
done = False
now = time.time()
while time.time() - now < 2.0:
if not self.com_if.data_available():
time.sleep(0.2)
continue
for reply in self.com_if.receive():
result = self.verificator.add_tm(
Service1Tm.from_tm(PusTm.unpack(reply, 0), UnpackParams(0))
)
if result is not None and result.completed:
_LOGGER.info("received ping completion reply")
_LOGGER.info(f"received {context} reply")
done = True
if done:
break
if not done:
_LOGGER.warning(f"no {context} reply received")
def handle_corruption_cmd(self, target: Target):
if target == Target.BOOTLOADER:
@ -300,12 +301,12 @@ def main() -> int:
if "serial_port" in parsed_toml:
serial_port = parsed_toml["serial_port"]
if serial_port is None:
serial_port = prompt_com_port()
serial_port = input("Please specify the serial port manually: ")
serial_cfg = SerialCfg(
com_if_id="ser_cobs",
serial_port=serial_port,
baud_rate=BAUD_RATE,
serial_timeout=0.1,
polling_frequency=0.1,
)
verificator = PusVerificator()
com_if = SerialCobsComIF(serial_cfg)

View File

@ -1 +1 @@
serial_port = "/dev/ttyUSB0"
serial_port = "/dev/ttyUSB1"

View File

@ -1,5 +1,5 @@
spacepackets == 0.24
tmtccmd == 8.0.2
spacepackets == 0.28
com-interface == 0.1
toml == 0.10
pyelftools == 0.31
crcmod == 1.7

View File

@ -451,7 +451,7 @@ mod app {
cx.local.encoded_buf[send_size + 1] = 0;
cx.local
.uart_tx
.write(&cx.local.encoded_buf[0..send_size + 2])
.write_all(&cx.local.encoded_buf[0..send_size + 2])
.unwrap();
occupied_len -= 1;
Mono::delay(2.millis()).await;

View File

@ -1149,8 +1149,7 @@ impl RxWithInterrupt {
let mut result = IrqResultMaxSizeOrTimeout::default();
let irq_status = self.0.regs.read_irq_status();
let irq_enabled = self.0.regs.read_irq_enabled();
let rx_enabled = irq_enabled.rx();
let rx_enabled = self.0.regs.read_enable().rx();
// Half-Full interrupt. We have a guaranteed amount of data we can read.
if irq_status.rx() {
@ -1187,8 +1186,8 @@ impl RxWithInterrupt {
// While there is data in the FIFO, write it into the reception buffer
match self.0.read() {
Ok(byte) => {
buf[result.bytes_read] = byte;
result.bytes_read += 1;
buf[context.rx_idx] = byte;
context.rx_idx += 1;
}
Err(_) => break,
}