new update for ringbuf

This commit is contained in:
Robin Müller 2025-01-11 11:38:04 +01:00
parent badeea8071
commit 4b318ecc76
Signed by: muellerr
GPG Key ID: A649FB78196E3849
4 changed files with 76 additions and 129 deletions

View File

@ -35,8 +35,6 @@ features = ["critical-section"]
[dependencies.ringbuf]
version = "0.4.7"
default-features = false
git = "https://github.com/robamu/ringbuf.git"
branch = "remove-mut-on-split-ref"
features = ["portable-atomic"]
[dependencies.va108xx-hal]

View File

@ -5,25 +5,17 @@
#![no_main]
#![no_std]
use once_cell::sync::Lazy;
use ringbuf::StaticRb;
// Larger buffer for TC to be able to hold the possibly large memory write packets.
const RX_RING_BUF_SIZE: usize = 1024;
// Ring buffers to handling variable sized telemetry
static RINGBUF: Lazy<StaticRb<u8, RX_RING_BUF_SIZE>> =
Lazy::new(StaticRb::<u8, RX_RING_BUF_SIZE>::default);
#[rtic::app(device = pac, dispatchers = [OC4])]
mod app {
use super::*;
use embedded_io::Write;
use panic_rtt_target as _;
use ringbuf::{
traits::{Consumer, Observer, Producer, SplitRef},
CachingCons, StaticProd,
};
use ringbuf::traits::{Consumer, Observer, Producer};
use rtic_example::SYSCLK_FREQ;
use rtic_monotonics::Monotonic;
use rtt_target::{rprintln, rtt_init_print};
@ -36,14 +28,14 @@ mod app {
#[local]
struct Local {
data_producer: StaticProd<'static, u8, RX_RING_BUF_SIZE>,
data_consumer: CachingCons<&'static StaticRb<u8, RX_RING_BUF_SIZE>>,
rx: RxWithIrq<pac::Uarta>,
tx: Tx<pac::Uarta>,
}
#[shared]
struct Shared {}
struct Shared {
rb: StaticRb<u8, RX_RING_BUF_SIZE>,
}
rtic_monotonics::systick_monotonic!(Mono, 1_000);
@ -71,13 +63,12 @@ mod app {
rx.start();
let (data_producer, data_consumer) = RINGBUF.split_ref();
echo_handler::spawn().unwrap();
(
Shared {},
Shared {
rb: StaticRb::default(),
},
Local {
data_producer,
data_consumer,
rx,
tx,
},
@ -94,24 +85,23 @@ mod app {
#[task(
binds = OC3,
shared = [],
shared = [rb],
local = [
rx,
data_producer
],
)]
fn reception_task(cx: reception_task::Context) {
fn reception_task(mut cx: reception_task::Context) {
let mut buf: [u8; 16] = [0; 16];
let mut ringbuf_full = false;
let result = cx.local.rx.irq_handler(&mut buf);
if result.bytes_read > 0 && result.errors.is_none() {
if cx.local.data_producer.vacant_len() < result.bytes_read {
ringbuf_full = true;
} else {
cx.local
.data_producer
.push_slice(&buf[0..result.bytes_read]);
}
cx.shared.rb.lock(|rb| {
if rb.vacant_len() < result.bytes_read {
ringbuf_full = true;
} else {
rb.push_slice(&buf[0..result.bytes_read]);
}
});
}
if ringbuf_full {
// Could also drop oldest data, but that would require the consumer to be shared.
@ -119,24 +109,23 @@ mod app {
}
}
#[task(shared = [], local = [
#[task(shared = [rb], local = [
buf: [u8; RX_RING_BUF_SIZE] = [0; RX_RING_BUF_SIZE],
data_consumer,
tx
], priority=1)]
async fn echo_handler(cx: echo_handler::Context) {
async fn echo_handler(mut cx: echo_handler::Context) {
loop {
let bytes_to_read = cx.local.data_consumer.occupied_len();
if bytes_to_read > 0 {
let actual_read_bytes = cx
.local
.data_consumer
.pop_slice(&mut cx.local.buf[0..bytes_to_read]);
cx.local
.tx
.write_all(&cx.local.buf[0..actual_read_bytes])
.expect("Failed to write to TX");
}
cx.shared.rb.lock(|rb| {
let bytes_to_read = rb.occupied_len();
if bytes_to_read > 0 {
let actual_read_bytes = rb.pop_slice(&mut cx.local.buf[0..bytes_to_read]);
cx.local
.tx
.write_all(&cx.local.buf[0..actual_read_bytes])
.expect("Failed to write to TX");
}
});
Mono::delay(50.millis()).await;
}
}

View File

@ -25,8 +25,6 @@ version = "0.4"
[dependencies.ringbuf]
version = "0.4.7"
default-features = false
git = "https://github.com/robamu/ringbuf.git"
branch = "remove-mut-on-split-ref"
features = ["portable-atomic"]
[dependencies.once_cell]

View File

@ -4,11 +4,10 @@
#![no_std]
use num_enum::TryFromPrimitive;
use once_cell::sync::Lazy;
use panic_rtt_target as _;
use ringbuf::{
traits::{Consumer, Observer, Producer, SplitRef},
CachingCons, StaticProd, StaticRb,
traits::{Consumer, Observer, Producer},
StaticRb,
};
use va108xx_hal::prelude::*;
@ -44,26 +43,9 @@ const SIZES_RB_SIZE_TC: usize = 16;
const BUF_RB_SIZE_TM: usize = 256;
const SIZES_RB_SIZE_TM: usize = 16;
// Ring buffers to handling variable sized telemetry
static BUF_RB_TM: Lazy<StaticRb<u8, BUF_RB_SIZE_TM>> =
Lazy::new(StaticRb::<u8, BUF_RB_SIZE_TM>::default);
static SIZES_RB_TM: Lazy<StaticRb<usize, SIZES_RB_SIZE_TM>> =
Lazy::new(StaticRb::<usize, SIZES_RB_SIZE_TM>::default);
// Ring buffers to handling variable sized telecommands
static BUF_RB_TC: Lazy<StaticRb<u8, BUF_RB_SIZE_TC>> =
Lazy::new(StaticRb::<u8, BUF_RB_SIZE_TC>::default);
static SIZES_RB_TC: Lazy<StaticRb<usize, SIZES_RB_SIZE_TC>> =
Lazy::new(StaticRb::<usize, SIZES_RB_SIZE_TC>::default);
pub struct DataProducer<const BUF_SIZE: usize, const SIZES_LEN: usize> {
pub buf_prod: StaticProd<'static, u8, BUF_SIZE>,
pub sizes_prod: StaticProd<'static, usize, SIZES_LEN>,
}
pub struct DataConsumer<const BUF_SIZE: usize, const SIZES_LEN: usize> {
pub buf_cons: CachingCons<&'static StaticRb<u8, BUF_SIZE>>,
pub sizes_cons: CachingCons<&'static StaticRb<usize, SIZES_LEN>>,
pub struct RingBufWrapper<const BUF_SIZE: usize, const SIZES_LEN: usize> {
pub buf: StaticRb<u8, BUF_SIZE>,
pub sizes: StaticRb<usize, SIZES_LEN>,
}
pub const APP_A_START_ADDR: u32 = 0x3000;
@ -105,12 +87,6 @@ mod app {
uart_rx: uart::RxWithIrq<pac::Uarta>,
uart_tx: uart::Tx<pac::Uarta>,
rx_context: IrqContextTimeoutOrMaxSize,
// We handle all TM in one task.
tm_cons: DataConsumer<BUF_RB_SIZE_TM, SIZES_RB_SIZE_TM>,
// We consume all TC in one task.
tc_cons: DataConsumer<BUF_RB_SIZE_TC, SIZES_RB_SIZE_TC>,
// We produce all TC in one task.
tc_prod: DataProducer<BUF_RB_SIZE_TC, SIZES_RB_SIZE_TC>,
verif_reporter: VerificationReportCreator,
nvm: M95M01,
}
@ -118,7 +94,8 @@ mod app {
#[shared]
struct Shared {
// Having this shared allows multiple tasks to generate telemetry.
tm_prod: DataProducer<BUF_RB_SIZE_TM, SIZES_RB_SIZE_TM>,
tm_rb: RingBufWrapper<BUF_RB_SIZE_TM, SIZES_RB_SIZE_TM>,
tc_rb: RingBufWrapper<BUF_RB_SIZE_TC, SIZES_RB_SIZE_TC>,
}
rtic_monotonics::systick_monotonic!(Mono, 1000);
@ -149,12 +126,6 @@ mod app {
let verif_reporter = VerificationReportCreator::new(0).unwrap();
let (buf_prod_tm, buf_cons_tm) = BUF_RB_TM.split_ref();
let (sizes_prod_tm, sizes_cons_tm) = SIZES_RB_TM.split_ref();
let (buf_prod_tc, buf_cons_tc) = BUF_RB_TC.split_ref();
let (sizes_prod_tc, sizes_cons_tc) = SIZES_RB_TC.split_ref();
let mut rx_context = IrqContextTimeoutOrMaxSize::new(MAX_TC_FRAME_SIZE);
rx.read_fixed_len_or_timeout_based_using_irq(&mut rx_context)
.expect("initiating UART RX failed");
@ -162,27 +133,19 @@ mod app {
pus_tm_tx_handler::spawn().unwrap();
(
Shared {
tm_prod: DataProducer {
buf_prod: buf_prod_tm,
sizes_prod: sizes_prod_tm,
tc_rb: RingBufWrapper {
buf: StaticRb::default(),
sizes: StaticRb::default(),
},
tm_rb: RingBufWrapper {
buf: StaticRb::default(),
sizes: StaticRb::default(),
},
},
Local {
uart_rx: rx,
uart_tx: tx,
rx_context,
tm_cons: DataConsumer {
buf_cons: buf_cons_tm,
sizes_cons: sizes_cons_tm,
},
tc_cons: DataConsumer {
buf_cons: buf_cons_tc,
sizes_cons: sizes_cons_tc,
},
tc_prod: DataProducer {
buf_prod: buf_prod_tc,
sizes_prod: sizes_prod_tc,
},
verif_reporter,
nvm,
},
@ -205,10 +168,10 @@ mod app {
rx_buf: [u8; MAX_TC_FRAME_SIZE] = [0; MAX_TC_FRAME_SIZE],
rx_context,
uart_rx,
tc_prod
],
shared = [tc_rb]
)]
fn uart_rx_irq(cx: uart_rx_irq::Context) {
fn uart_rx_irq(mut cx: uart_rx_irq::Context) {
match cx
.local
.uart_rx
@ -231,16 +194,17 @@ mod app {
log::warn!("COBS decoding failed");
} else {
let decoded_size = decoded_size.unwrap();
if cx.local.tc_prod.sizes_prod.vacant_len() >= 1
&& cx.local.tc_prod.buf_prod.vacant_len() >= decoded_size
{
// Should never fail, we checked there is enough space.
cx.local.tc_prod.sizes_prod.try_push(decoded_size).unwrap();
cx.local
.tc_prod
.buf_prod
.push_slice(&cx.local.rx_buf[1..1 + decoded_size]);
} else {
let mut tc_rb_full = false;
cx.shared.tc_rb.lock(|rb| {
if rb.sizes.vacant_len() >= 1 && rb.buf.vacant_len() >= decoded_size
{
rb.sizes.try_push(decoded_size).unwrap();
rb.buf.push_slice(&cx.local.rx_buf[1..1 + decoded_size]);
} else {
tc_rb_full = true;
}
});
if tc_rb_full {
log::warn!("COBS TC queue full");
}
}
@ -271,16 +235,15 @@ mod app {
readback_buf: [u8; MAX_TC_SIZE] = [0; MAX_TC_SIZE],
src_data_buf: [u8; 16] = [0; 16],
verif_buf: [u8; 32] = [0; 32],
tc_cons,
nvm,
verif_reporter
],
shared=[tm_prod]
shared=[tm_rb, tc_rb]
)]
async fn pus_tc_handler(mut cx: pus_tc_handler::Context) {
loop {
// Try to read a TC from the ring buffer.
let packet_len = cx.local.tc_cons.sizes_cons.try_pop();
let packet_len = cx.shared.tc_rb.lock(|rb| rb.sizes.try_pop());
if packet_len.is_none() {
// Small delay, TCs might arrive very quickly.
Mono::delay(20.millis()).await;
@ -288,13 +251,11 @@ mod app {
}
let packet_len = packet_len.unwrap();
log::info!(target: "TC Handler", "received packet with length {}", packet_len);
assert_eq!(
cx.local
.tc_cons
.buf_cons
.pop_slice(&mut cx.local.tc_buf[0..packet_len]),
packet_len
);
let popped_packet_len = cx.shared.tc_rb.lock(|rb| {
rb.buf
.pop_slice(&mut cx.local.tc_buf[0..packet_len])
});
assert_eq!(popped_packet_len, packet_len);
// Read a telecommand, now handle it.
handle_valid_pus_tc(&mut cx);
}
@ -309,9 +270,9 @@ mod app {
let (pus_tc, _) = pus_tc.unwrap();
let mut write_and_send = |tm: &PusTmCreator| {
let written_size = tm.write_to_bytes(cx.local.verif_buf).unwrap();
cx.shared.tm_prod.lock(|prod| {
prod.sizes_prod.try_push(tm.len_written()).unwrap();
prod.buf_prod
cx.shared.tm_rb.lock(|prod| {
prod.sizes.try_push(tm.len_written()).unwrap();
prod.buf
.push_slice(&cx.local.verif_buf[0..written_size]);
});
};
@ -475,18 +436,18 @@ mod app {
read_buf: [u8;MAX_TM_SIZE] = [0; MAX_TM_SIZE],
encoded_buf: [u8;MAX_TM_FRAME_SIZE] = [0; MAX_TM_FRAME_SIZE],
uart_tx,
tm_cons
],
shared=[]
shared=[tm_rb]
)]
async fn pus_tm_tx_handler(cx: pus_tm_tx_handler::Context) {
async fn pus_tm_tx_handler(mut cx: pus_tm_tx_handler::Context) {
loop {
while cx.local.tm_cons.sizes_cons.occupied_len() > 0 {
let next_size = cx.local.tm_cons.sizes_cons.try_pop().unwrap();
cx.local
.tm_cons
.buf_cons
.pop_slice(&mut cx.local.read_buf[0..next_size]);
let mut occupied_len = cx.shared.tm_rb.lock(|rb| rb.sizes.occupied_len());
while occupied_len > 0 {
let next_size = cx.shared.tm_rb.lock(|rb| {
let next_size = rb.sizes.try_pop().unwrap();
rb.buf.pop_slice(&mut cx.local.read_buf[0..next_size]);
next_size
});
cx.local.encoded_buf[0] = 0;
let send_size = cobs::encode(
&cx.local.read_buf[0..next_size],
@ -497,6 +458,7 @@ mod app {
.uart_tx
.write(&cx.local.encoded_buf[0..send_size + 2])
.unwrap();
occupied_len -= 1;
Mono::delay(2.millis()).await;
}
Mono::delay(50.millis()).await;