Implementation of scheduler in pus and tmtc handler #29
@ -3,12 +3,96 @@ use crate::pool::{PoolProvider, StoreAddr, StoreError};
|
||||
use alloc::collections::btree_map::{Entry, Range};
|
||||
use alloc::vec;
|
||||
use alloc::vec::Vec;
|
||||
use core::fmt::{Debug, Display, Formatter};
|
||||
use core::time::Duration;
|
||||
use spacepackets::time::UnixTimestamp;
|
||||
use spacepackets::ecss::{PusError, PusPacket};
|
||||
use spacepackets::tc::PusTc;
|
||||
use spacepackets::time::cds::DaysLen24Bits;
|
||||
use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp};
|
||||
use std::collections::BTreeMap;
|
||||
#[cfg(feature = "std")]
|
||||
lkoester marked this conversation as resolved
Outdated
|
||||
use std::error::Error;
|
||||
#[cfg(feature = "std")]
|
||||
use std::time::SystemTimeError;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ScheduleError {
|
||||
PusError(PusError),
|
||||
TimeMarginTooShort(UnixTimestamp, UnixTimestamp),
|
||||
NestedScheduledTc,
|
||||
lkoester marked this conversation as resolved
Outdated
muellerr
commented
TC -> Tc for consistency TC -> Tc for consistency
lkoester
commented
Passt Passt
|
||||
StoreError(StoreError),
|
||||
TcDataEmpty,
|
||||
TimestampError(TimestampError),
|
||||
WrongSubservice,
|
||||
WrongService,
|
||||
}
|
||||
|
||||
impl Display for ScheduleError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
ScheduleError::PusError(e) => {
|
||||
write!(f, "Pus Error: {}", e)
|
||||
}
|
||||
ScheduleError::TimeMarginTooShort(current_time, timestamp) => {
|
||||
write!(
|
||||
f,
|
||||
"Error: time margin too short, current time: {:?}, time stamp: {:?}",
|
||||
current_time, timestamp
|
||||
)
|
||||
}
|
||||
ScheduleError::NestedScheduledTc => {
|
||||
write!(f, "Error: nested scheduling is not allowed")
|
||||
}
|
||||
ScheduleError::StoreError(e) => {
|
||||
write!(f, "Store Error: {}", e)
|
||||
}
|
||||
ScheduleError::TcDataEmpty => {
|
||||
write!(f, "Error: empty Tc Data field")
|
||||
}
|
||||
ScheduleError::TimestampError(e) => {
|
||||
write!(f, "Timestamp Error: {}", e)
|
||||
}
|
||||
ScheduleError::WrongService => {
|
||||
write!(f, "Error: Service not 11.")
|
||||
}
|
||||
ScheduleError::WrongSubservice => {
|
||||
write!(f, "Error: Subservice not 4.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PusError> for ScheduleError {
|
||||
fn from(e: PusError) -> Self {
|
||||
ScheduleError::PusError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StoreError> for ScheduleError {
|
||||
fn from(e: StoreError) -> Self {
|
||||
ScheduleError::StoreError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TimestampError> for ScheduleError {
|
||||
fn from(e: TimestampError) -> Self {
|
||||
ScheduleError::TimestampError(e)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl Error for ScheduleError {}
|
||||
|
||||
//TODO: Move to spacepackets
|
||||
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
||||
pub enum ScheduleSubservice {
|
||||
EnableScheduling = 1,
|
||||
DisableScheduling = 2,
|
||||
ResetScheduling = 3,
|
||||
InsertActivity = 4,
|
||||
DeleteActivity = 5,
|
||||
}
|
||||
|
||||
/// This is the core data structure for scheduling PUS telecommands with [alloc] support.
|
||||
///
|
||||
/// It is assumed that the actual telecommand data is stored in a separate TC pool offering
|
||||
@ -82,7 +166,8 @@ impl PusScheduler {
|
||||
/// The holding store for the telecommands needs to be passed so all the stored telecommands
|
||||
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
|
||||
/// will be returned but the method will still try to delete all the commands in the schedule.
|
||||
pub fn reset(&mut self, store: &mut impl PoolProvider) -> Result<(), StoreError> {
|
||||
|
||||
pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> {
|
||||
self.enabled = false;
|
||||
let mut deletion_ok = Ok(());
|
||||
for tc_lists in &mut self.tc_map {
|
||||
@ -105,9 +190,16 @@ impl PusScheduler {
|
||||
&self.current_time
|
||||
}
|
||||
|
||||
pub fn insert_tc(&mut self, time_stamp: UnixTimestamp, addr: StoreAddr) -> bool {
|
||||
pub fn insert_unwrapped_and_stored_tc(
|
||||
&mut self,
|
||||
time_stamp: UnixTimestamp,
|
||||
addr: StoreAddr,
|
||||
) -> Result<(), ScheduleError> {
|
||||
if time_stamp < self.current_time + self.time_margin {
|
||||
return false;
|
||||
return Err(ScheduleError::TimeMarginTooShort(
|
||||
self.current_time,
|
||||
time_stamp,
|
||||
));
|
||||
}
|
||||
match self.tc_map.entry(time_stamp) {
|
||||
Entry::Vacant(e) => {
|
||||
@ -117,7 +209,65 @@ impl PusScheduler {
|
||||
v.get_mut().push(addr);
|
||||
}
|
||||
}
|
||||
true
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_unwrapped_tc(
|
||||
&mut self,
|
||||
time_stamp: UnixTimestamp,
|
||||
tc: &[u8],
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<StoreAddr, ScheduleError> {
|
||||
let check_tc = PusTc::from_bytes(tc)?;
|
||||
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
|
||||
return Err(ScheduleError::NestedScheduledTc);
|
||||
}
|
||||
lkoester marked this conversation as resolved
Outdated
muellerr
commented
TODO can be removed probably TODO can be removed probably
|
||||
|
||||
match pool.add(tc) {
|
||||
Ok(addr) => {
|
||||
self.insert_unwrapped_and_stored_tc(time_stamp, addr)?;
|
||||
Ok(addr)
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
// <T: FnMut(&[u8]) -> (&dyn CcsdsTimeProvider)>
|
||||
pub fn insert_wrapped_tc<TimeStamp: CcsdsTimeProvider + TimeReader>(
|
||||
&mut self,
|
||||
pus_tc: &PusTc,
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<StoreAddr, ScheduleError> {
|
||||
if PusPacket::service(pus_tc) != 11 {
|
||||
return Err(ScheduleError::WrongService);
|
||||
}
|
||||
if PusPacket::subservice(pus_tc) != 4 {
|
||||
return Err(ScheduleError::WrongSubservice);
|
||||
}
|
||||
return if let Some(user_data) = pus_tc.user_data() {
|
||||
let stamp: TimeStamp = TimeReader::from_bytes(user_data)?;
|
||||
let unix_stamp = stamp.unix_stamp();
|
||||
let stamp_len = stamp.len_as_bytes();
|
||||
self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool)
|
||||
} else {
|
||||
Err(ScheduleError::TcDataEmpty)
|
||||
};
|
||||
}
|
||||
|
||||
pub fn insert_wrapped_tc_cds_short(
|
||||
&mut self,
|
||||
pus_tc: &PusTc,
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<StoreAddr, ScheduleError> {
|
||||
self.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(pus_tc, pool)
|
||||
}
|
||||
|
||||
pub fn insert_wrapped_tc_cds_long(
|
||||
&mut self,
|
||||
pus_tc: &PusTc,
|
||||
pool: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<StoreAddr, ScheduleError> {
|
||||
self.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
|
||||
}
|
||||
|
||||
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<StoreAddr>> {
|
||||
@ -133,18 +283,19 @@ impl PusScheduler {
|
||||
|
||||
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
|
||||
/// closure for each telecommand which should be released. This function will also delete
|
||||
/// the telecommands from the holding store after calling the release closure.
|
||||
/// the telecommands from the holding store after calling the release closure, if the scheduler
|
||||
/// is disabled.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
|
||||
/// the second argument is the store address. This closure should return whether the
|
||||
/// command should be deleted.
|
||||
/// command should be deleted if the scheduler is disabled to prevent memory leaks.
|
||||
/// * `store` - The holding store of the telecommands.
|
||||
pub fn release_telecommands<R: FnMut(bool, &StoreAddr) -> bool>(
|
||||
&mut self,
|
||||
mut releaser: R,
|
||||
tc_store: &mut impl PoolProvider,
|
||||
tc_store: &mut (impl PoolProvider + ?Sized),
|
||||
) -> Result<u64, (u64, StoreError)> {
|
||||
let tcs_to_release = self.telecommands_to_release();
|
||||
let mut released_tcs = 0;
|
||||
@ -153,7 +304,7 @@ impl PusScheduler {
|
||||
for addr in tc.1 {
|
||||
let should_delete = releaser(self.enabled, addr);
|
||||
released_tcs += 1;
|
||||
if should_delete {
|
||||
if should_delete && !self.is_enabled() {
|
||||
let res = tc_store.delete(*addr);
|
||||
if res.is_err() {
|
||||
store_error = res;
|
||||
@ -171,10 +322,20 @@ impl PusScheduler {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr};
|
||||
use crate::pus::scheduling::PusScheduler;
|
||||
use alloc::vec::Vec;
|
||||
use spacepackets::time::UnixTimestamp;
|
||||
use crate::pus::scheduling::{PusScheduler, ScheduleError};
|
||||
use crate::tmtc::ccsds_distrib::tests::generate_ping_tc;
|
||||
use alloc::rc::Rc;
|
||||
use core::borrow::BorrowMut;
|
||||
use spacepackets::ecss::PacketTypeCodes::UnsignedInt;
|
||||
use spacepackets::ecss::PusPacket;
|
||||
use spacepackets::tc::PusTc;
|
||||
use spacepackets::time::{cds, TimeWriter, UnixTimestamp};
|
||||
use spacepackets::{CcsdsPacket, SpHeader};
|
||||
use std::cell::RefCell;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::{channel, Receiver, TryRecvError};
|
||||
use std::time::Duration;
|
||||
use std::vec::Vec;
|
||||
#[allow(unused_imports)]
|
||||
use std::{println, vec};
|
||||
|
||||
@ -194,19 +355,29 @@ mod tests {
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let first_addr = pool.add(&[0, 1, 2]).unwrap();
|
||||
let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone());
|
||||
|
||||
assert!(worked);
|
||||
scheduler
|
||||
.insert_unwrapped_and_stored_tc(
|
||||
UnixTimestamp::new_only_seconds(100),
|
||||
first_addr.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let second_addr = pool.add(&[2, 3, 4]).unwrap();
|
||||
let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone());
|
||||
|
||||
assert!(worked);
|
||||
scheduler
|
||||
.insert_unwrapped_and_stored_tc(
|
||||
UnixTimestamp::new_only_seconds(200),
|
||||
second_addr.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let third_addr = pool.add(&[5, 6, 7]).unwrap();
|
||||
let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone());
|
||||
|
||||
assert!(worked);
|
||||
scheduler
|
||||
.insert_unwrapped_and_stored_tc(
|
||||
UnixTimestamp::new_only_seconds(300),
|
||||
third_addr.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
|
||||
assert!(scheduler.is_enabled());
|
||||
@ -223,35 +394,35 @@ mod tests {
|
||||
let mut scheduler =
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let worked = scheduler.insert_tc(
|
||||
UnixTimestamp::new_only_seconds(100),
|
||||
StoreAddr {
|
||||
pool_idx: 0,
|
||||
packet_idx: 1,
|
||||
},
|
||||
);
|
||||
scheduler
|
||||
.insert_unwrapped_and_stored_tc(
|
||||
UnixTimestamp::new_only_seconds(100),
|
||||
StoreAddr {
|
||||
pool_idx: 0,
|
||||
packet_idx: 1,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(worked);
|
||||
let worked = scheduler
|
||||
.insert_unwrapped_and_stored_tc(
|
||||
UnixTimestamp::new_only_seconds(100),
|
||||
StoreAddr {
|
||||
pool_idx: 0,
|
||||
packet_idx: 2,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let worked = scheduler.insert_tc(
|
||||
UnixTimestamp::new_only_seconds(100),
|
||||
StoreAddr {
|
||||
pool_idx: 0,
|
||||
packet_idx: 2,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(worked);
|
||||
|
||||
let worked = scheduler.insert_tc(
|
||||
UnixTimestamp::new_only_seconds(300),
|
||||
StoreAddr {
|
||||
pool_idx: 0,
|
||||
packet_idx: 2,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(worked);
|
||||
let worked = scheduler
|
||||
.insert_unwrapped_and_stored_tc(
|
||||
UnixTimestamp::new_only_seconds(300),
|
||||
StoreAddr {
|
||||
pool_idx: 0,
|
||||
packet_idx: 2,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
|
||||
}
|
||||
@ -275,6 +446,17 @@ mod tests {
|
||||
assert!(expected_store_addrs.contains(store_addr));
|
||||
*counter += 1;
|
||||
}
|
||||
fn common_check_disabled(
|
||||
enabled: bool,
|
||||
store_addr: &StoreAddr,
|
||||
expected_store_addrs: Vec<StoreAddr>,
|
||||
counter: &mut usize,
|
||||
) {
|
||||
assert_eq!(enabled, false);
|
||||
assert!(expected_store_addrs.contains(store_addr));
|
||||
*counter += 1;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn release_basic() {
|
||||
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
|
||||
@ -282,10 +464,11 @@ mod tests {
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let first_addr = pool.add(&[2, 2, 2]).unwrap();
|
||||
scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr);
|
||||
|
||||
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
|
||||
|
||||
let second_addr = pool.add(&[5, 6, 7]).unwrap();
|
||||
scheduler.insert_tc(UnixTimestamp::new_only_seconds(200), second_addr);
|
||||
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr);
|
||||
|
||||
let mut i = 0;
|
||||
let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
@ -307,7 +490,7 @@ mod tests {
|
||||
.release_telecommands(&mut test_closure_1, &mut pool)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 1);
|
||||
assert!(!pool.has_element_at(&first_addr).unwrap());
|
||||
assert!(pool.has_element_at(&first_addr).unwrap());
|
||||
|
||||
// test 3, late timestamp, release 1 overdue tc
|
||||
let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
@ -321,7 +504,7 @@ mod tests {
|
||||
.release_telecommands(&mut test_closure_2, &mut pool)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 1);
|
||||
assert!(!pool.has_element_at(&second_addr).unwrap());
|
||||
assert!(pool.has_element_at(&second_addr).unwrap());
|
||||
|
||||
//test 4: no tcs left
|
||||
scheduler
|
||||
@ -339,10 +522,11 @@ mod tests {
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let first_addr = pool.add(&[2, 2, 2]).unwrap();
|
||||
scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr);
|
||||
|
||||
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
|
||||
|
||||
let second_addr = pool.add(&[2, 2, 2]).unwrap();
|
||||
scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), second_addr);
|
||||
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr);
|
||||
|
||||
let mut i = 0;
|
||||
let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
@ -365,8 +549,8 @@ mod tests {
|
||||
.release_telecommands(&mut test_closure, &mut pool)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 2);
|
||||
assert!(!pool.has_element_at(&first_addr).unwrap());
|
||||
assert!(!pool.has_element_at(&second_addr).unwrap());
|
||||
assert!(pool.has_element_at(&first_addr).unwrap());
|
||||
assert!(pool.has_element_at(&second_addr).unwrap());
|
||||
|
||||
//test 3: no tcs left
|
||||
released = scheduler
|
||||
@ -377,4 +561,261 @@ mod tests {
|
||||
// check that 2 total tcs have been released
|
||||
assert_eq!(i, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn release_with_scheduler_disabled() {
|
||||
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
|
||||
let mut scheduler =
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
scheduler.disable();
|
||||
|
||||
let first_addr = pool.add(&[2, 2, 2]).unwrap();
|
||||
|
||||
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
|
||||
|
||||
let second_addr = pool.add(&[5, 6, 7]).unwrap();
|
||||
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr);
|
||||
|
||||
let mut i = 0;
|
||||
let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i);
|
||||
true
|
||||
};
|
||||
|
||||
// test 1: too early, no tcs
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
|
||||
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure_1, &mut pool)
|
||||
.expect("deletion failed");
|
||||
|
||||
// test 2: exact time stamp of tc, releases 1 tc
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(100));
|
||||
|
||||
let mut released = scheduler
|
||||
.release_telecommands(&mut test_closure_1, &mut pool)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 1);
|
||||
assert!(!pool.has_element_at(&first_addr).unwrap());
|
||||
|
||||
// test 3, late timestamp, release 1 overdue tc
|
||||
let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i);
|
||||
true
|
||||
};
|
||||
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(206));
|
||||
|
||||
released = scheduler
|
||||
.release_telecommands(&mut test_closure_2, &mut pool)
|
||||
.expect("deletion failed");
|
||||
assert_eq!(released, 1);
|
||||
assert!(!pool.has_element_at(&second_addr).unwrap());
|
||||
|
||||
//test 4: no tcs left
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure_2, &mut pool)
|
||||
.expect("deletion failed");
|
||||
|
||||
// check that 2 total tcs have been released
|
||||
assert_eq!(i, 2);
|
||||
}
|
||||
|
||||
fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
|
||||
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap();
|
||||
|
||||
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
|
||||
|
||||
let len_packet = base_ping_tc_simple_ctor()
|
||||
.write_to_bytes(&mut buf[len_time_stamp..])
|
||||
.unwrap();
|
||||
let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap();
|
||||
|
||||
PusTc::new_simple(
|
||||
&mut sph,
|
||||
11,
|
||||
4,
|
||||
Some(&buf[..len_packet + len_time_stamp]),
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
fn wrong_tc_service(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
|
||||
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap();
|
||||
|
||||
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
|
||||
|
||||
let len_packet = base_ping_tc_simple_ctor()
|
||||
.write_to_bytes(&mut buf[len_time_stamp..])
|
||||
.unwrap();
|
||||
let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap();
|
||||
|
||||
PusTc::new_simple(
|
||||
&mut sph,
|
||||
12,
|
||||
4,
|
||||
Some(&buf[..len_packet + len_time_stamp]),
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
fn wrong_tc_subservice(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
|
||||
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(×tamp).unwrap();
|
||||
|
||||
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
|
||||
|
||||
let len_packet = base_ping_tc_simple_ctor()
|
||||
.write_to_bytes(&mut buf[len_time_stamp..])
|
||||
.unwrap();
|
||||
let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap();
|
||||
|
||||
PusTc::new_simple(
|
||||
&mut sph,
|
||||
11,
|
||||
5,
|
||||
Some(&buf[..len_packet + len_time_stamp]),
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
fn base_ping_tc_simple_ctor() -> PusTc<'static> {
|
||||
let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap();
|
||||
PusTc::new_simple(&mut sph, 17, 1, None, true)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_unwrapped_tc() {
|
||||
let mut scheduler =
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
|
||||
let mut buf: [u8; 32] = [0; 32];
|
||||
let len = base_ping_tc_simple_ctor().write_to_bytes(&mut buf).unwrap();
|
||||
|
||||
let addr = scheduler
|
||||
.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool)
|
||||
.unwrap();
|
||||
|
||||
assert!(pool.has_element_at(&addr).unwrap());
|
||||
|
||||
let data = pool.read(&addr).unwrap();
|
||||
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
|
||||
|
||||
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
|
||||
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(101));
|
||||
|
||||
let mut addr_vec = vec::Vec::new();
|
||||
|
||||
let mut i = 0;
|
||||
let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
common_check(boolvar, store_addr, vec![addr], &mut i);
|
||||
// check that tc remains unchanged
|
||||
addr_vec.push(*store_addr);
|
||||
false
|
||||
};
|
||||
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure, &mut pool)
|
||||
.unwrap();
|
||||
|
||||
let data = pool.read(&addr_vec[0]).unwrap();
|
||||
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_wrapped_tc() {
|
||||
let mut scheduler =
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
|
||||
|
||||
let mut buf: [u8; 32] = [0; 32];
|
||||
let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf);
|
||||
|
||||
let addr = match scheduler
|
||||
.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(&tc, &mut pool)
|
||||
{
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
println!("{}", e);
|
||||
panic!();
|
||||
}
|
||||
};
|
||||
|
||||
assert!(pool.has_element_at(&addr).unwrap());
|
||||
|
||||
let data = pool.read(&addr).unwrap();
|
||||
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
|
||||
|
||||
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
|
||||
|
||||
scheduler.update_time(UnixTimestamp::new_only_seconds(101));
|
||||
|
||||
let mut addr_vec = vec::Vec::new();
|
||||
|
||||
let mut i = 0;
|
||||
let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| {
|
||||
common_check(boolvar, store_addr, vec![addr], &mut i);
|
||||
// check that tc remains unchanged
|
||||
addr_vec.push(*store_addr);
|
||||
false
|
||||
};
|
||||
|
||||
scheduler
|
||||
.release_telecommands(&mut test_closure, &mut pool)
|
||||
.unwrap();
|
||||
|
||||
let data = pool.read(&addr_vec[0]).unwrap();
|
||||
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
|
||||
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_wrong_service() {
|
||||
let mut scheduler =
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
|
||||
|
||||
let mut buf: [u8; 32] = [0; 32];
|
||||
let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf);
|
||||
|
||||
match scheduler.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(&tc, &mut pool) {
|
||||
Ok(_) => {
|
||||
panic!();
|
||||
}
|
||||
Err(e) => {
|
||||
if e != ScheduleError::WrongService {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_wrong_subservice() {
|
||||
let mut scheduler =
|
||||
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
|
||||
|
||||
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
|
||||
|
||||
let mut buf: [u8; 32] = [0; 32];
|
||||
let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf);
|
||||
|
||||
match scheduler.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(&tc, &mut pool) {
|
||||
Ok(_) => {
|
||||
panic!();
|
||||
}
|
||||
Err(e) => {
|
||||
if e != ScheduleError::WrongSubservice {
|
||||
panic!();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,18 +5,21 @@ import struct
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
import datetime
|
||||
|
||||
import tmtccmd
|
||||
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator
|
||||
from spacepackets.ecss.pus_17_test import Service17Tm
|
||||
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
|
||||
from spacepackets.ccsds.time import CdsShortTimestamp
|
||||
|
||||
from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper
|
||||
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
|
||||
from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd
|
||||
from tmtccmd.core.base import BackendRequest
|
||||
from tmtccmd.pus import VerificationWrapper
|
||||
from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase
|
||||
from tmtccmd.com_if import ComInterface
|
||||
from tmtccmd.com import ComInterface
|
||||
from tmtccmd.config import (
|
||||
default_json_path,
|
||||
SetupParams,
|
||||
@ -41,7 +44,7 @@ from tmtccmd.tc import (
|
||||
SendCbParams,
|
||||
DefaultPusQueueHelper,
|
||||
)
|
||||
from tmtccmd.tm.pus_5_event import Service5Tm
|
||||
from tmtccmd.tm.pus_5_fsfw_event import Service5Tm
|
||||
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
|
||||
from tmtccmd.util.obj_id import ObjectIdDictT
|
||||
|
||||
@ -57,7 +60,7 @@ class SatRsConfigHook(TmTcCfgHookBase):
|
||||
super().__init__(json_cfg_path=json_cfg_path)
|
||||
|
||||
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
|
||||
from tmtccmd.config.com_if import (
|
||||
from tmtccmd.config.com import (
|
||||
create_com_interface_default,
|
||||
create_com_interface_cfg_default,
|
||||
)
|
||||
@ -94,6 +97,13 @@ class SatRsConfigHook(TmTcCfgHookBase):
|
||||
info="PUS Service 3 Housekeeping",
|
||||
op_code_entry=srv_3
|
||||
)
|
||||
srv_11 = OpCodeEntry()
|
||||
srv_11.add("0", "Scheduled TC Test")
|
||||
defs.add_service(
|
||||
name=CoreServiceList.SERVICE_11,
|
||||
info="PUS Service 11 TC Scheduling",
|
||||
op_code_entry=srv_11,
|
||||
)
|
||||
return defs
|
||||
|
||||
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
|
||||
@ -120,7 +130,7 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
|
||||
def handle_tm(self, packet: bytes, _user_args: any):
|
||||
try:
|
||||
tm_packet = PusTelemetry.unpack(packet)
|
||||
tm_packet = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
|
||||
except ValueError as e:
|
||||
LOGGER.warning("Could not generate PUS TM object from raw data")
|
||||
LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
|
||||
@ -128,7 +138,7 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
service = tm_packet.service
|
||||
dedicated_handler = False
|
||||
if service == 1:
|
||||
tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(1, 2))
|
||||
tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2))
|
||||
res = self.verif_wrapper.add_tm(tm_packet)
|
||||
if res is None:
|
||||
LOGGER.info(
|
||||
@ -145,16 +155,16 @@ class PusHandler(SpecificApidHandlerBase):
|
||||
if service == 3:
|
||||
LOGGER.info("No handling for HK packets implemented")
|
||||
LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
|
||||
pus_tm = PusTelemetry.unpack(packet)
|
||||
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
|
||||
if pus_tm.subservice == 25:
|
||||
if len(pus_tm.source_data) < 8:
|
||||
raise ValueError("No addressable ID in HK packet")
|
||||
json_str = pus_tm.source_data[8:]
|
||||
dedicated_handler = True
|
||||
if service == 5:
|
||||
tm_packet = Service5Tm.unpack(packet)
|
||||
tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
|
||||
if service == 17:
|
||||
tm_packet = Service17Tm.unpack(packet)
|
||||
tm_packet = Service17Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
|
||||
dedicated_handler = True
|
||||
if tm_packet.subservice == 2:
|
||||
self.printer.file_logger.info("Received Ping Reply TM[17,2]")
|
||||
@ -205,7 +215,10 @@ class TcHandler(TcHandlerBase):
|
||||
self.verif_wrapper = verif_wrapper
|
||||
self.queue_helper = DefaultPusQueueHelper(
|
||||
queue_wrapper=None,
|
||||
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
|
||||
seq_cnt_provider=seq_count_provider,
|
||||
pus_verificator=self.verif_wrapper.pus_verificator,
|
||||
default_pus_apid=EXAMPLE_PUS_APID
|
||||
)
|
||||
|
||||
def send_cb(self, send_params: SendCbParams):
|
||||
@ -213,10 +226,6 @@ class TcHandler(TcHandlerBase):
|
||||
if entry_helper.is_tc:
|
||||
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
|
||||
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
|
||||
pus_tc_wrapper.pus_tc.seq_count = (
|
||||
self.seq_count_provider.get_and_increment()
|
||||
)
|
||||
self.verif_wrapper.add_tc(pus_tc_wrapper.pus_tc)
|
||||
raw_tc = pus_tc_wrapper.pus_tc.pack()
|
||||
LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
|
||||
send_params.com_if.send(raw_tc)
|
||||
@ -247,6 +256,14 @@ class TcHandler(TcHandlerBase):
|
||||
return q.add_pus_tc(
|
||||
PusTelecommand(service=17, subservice=1)
|
||||
)
|
||||
if service == CoreServiceList.SERVICE_11:
|
||||
q.add_log_cmd("Sending PUS scheduled TC telecommand")
|
||||
crt_time = CdsShortTimestamp.from_now()
|
||||
time_stamp = crt_time + datetime.timedelta(seconds=10)
|
||||
time_stamp = time_stamp.pack()
|
||||
return q.add_pus_tc(
|
||||
create_time_tagged_cmd(time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID)
|
||||
)
|
||||
if service == CoreServiceList.SERVICE_3:
|
||||
if op_code in HkOpCodes.GENERATE_ONE_SHOT:
|
||||
q.add_log_cmd("Sending HK one shot request")
|
||||
|
@ -1 +1,2 @@
|
||||
tmtccmd == 3.0.0
|
||||
# tmtccmd == 4.0.0a2
|
||||
-e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd
|
||||
|
@ -6,6 +6,7 @@ use satrs_core::pool::StoreAddr;
|
||||
use satrs_core::pus::event::Subservices;
|
||||
use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken};
|
||||
use satrs_core::pus::hk;
|
||||
use satrs_core::pus::scheduling::PusScheduler;
|
||||
use satrs_core::pus::verification::{
|
||||
FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
|
||||
};
|
||||
@ -17,7 +18,9 @@ use satrs_core::{
|
||||
spacepackets::time::TimeWriter, spacepackets::SpHeader,
|
||||
};
|
||||
use satrs_example::{hk_err, tmtc_err};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
pub struct PusReceiver {
|
||||
@ -31,6 +34,7 @@ pub struct PusReceiver {
|
||||
request_map: HashMap<u32, Sender<RequestWithToken>>,
|
||||
stamper: TimeProvider,
|
||||
time_stamp: [u8; 7],
|
||||
scheduler: Rc<RefCell<PusScheduler>>,
|
||||
}
|
||||
|
||||
impl PusReceiver {
|
||||
@ -42,6 +46,7 @@ impl PusReceiver {
|
||||
tc_source: PusTcSource,
|
||||
event_request_tx: Sender<EventRequestWithToken>,
|
||||
request_map: HashMap<u32, Sender<RequestWithToken>>,
|
||||
scheduler: Rc<RefCell<PusScheduler>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
tm_helper: PusTmWithCdsShortHelper::new(apid),
|
||||
@ -53,6 +58,7 @@ impl PusReceiver {
|
||||
request_map,
|
||||
stamper: TimeProvider::new_with_u16_days(0, 0),
|
||||
time_stamp: [0; 7],
|
||||
scheduler,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -78,6 +84,8 @@ impl PusServiceProvider for PusReceiver {
|
||||
self.handle_event_request(pus_tc, accepted_token);
|
||||
} else if service == 3 {
|
||||
self.handle_hk_request(pus_tc, accepted_token);
|
||||
} else if service == 11 {
|
||||
self.handle_scheduled_tc(pus_tc, accepted_token);
|
||||
} else {
|
||||
self.update_time_stamp();
|
||||
self.verif_reporter
|
||||
@ -201,6 +209,7 @@ impl PusReceiver {
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_event_request(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
|
||||
let send_start_failure = |verif_reporter: &mut StdVerifReporterWithSender,
|
||||
timestamp: &[u8; 7],
|
||||
@ -273,4 +282,112 @@ impl PusReceiver {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
|
||||
if pus_tc.user_data().is_none() {
|
||||
self.update_time_stamp();
|
||||
self.verif_reporter
|
||||
.start_failure(
|
||||
token,
|
||||
FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None),
|
||||
)
|
||||
.expect("Sending start failure TM failed");
|
||||
return;
|
||||
}
|
||||
|
||||
self.update_time_stamp();
|
||||
match pus_tc.subservice() {
|
||||
1 => {
|
||||
let start_token = self
|
||||
.verif_reporter
|
||||
.start_success(token, Some(&self.time_stamp))
|
||||
.expect("Error sending start success");
|
||||
|
||||
let mut scheduler = self.scheduler.borrow_mut();
|
||||
scheduler.enable();
|
||||
if scheduler.is_enabled() {
|
||||
self.verif_reporter
|
||||
.completion_success(start_token, Some(&self.time_stamp))
|
||||
.expect("Error sending completion success");
|
||||
} else {
|
||||
panic!("Failed to enable scheduler");
|
||||
}
|
||||
drop(scheduler);
|
||||
}
|
||||
2 => {
|
||||
let start_token = self
|
||||
.verif_reporter
|
||||
.start_success(token, Some(&self.time_stamp))
|
||||
.expect("Error sending start success");
|
||||
|
||||
let mut scheduler = self.scheduler.borrow_mut();
|
||||
scheduler.disable();
|
||||
if !scheduler.is_enabled() {
|
||||
self.verif_reporter
|
||||
.completion_success(start_token, Some(&self.time_stamp))
|
||||
.expect("Error sending completion success");
|
||||
} else {
|
||||
panic!("Failed to disable scheduler");
|
||||
}
|
||||
drop(scheduler);
|
||||
}
|
||||
3 => {
|
||||
let start_token = self
|
||||
.verif_reporter
|
||||
.start_success(token, Some(&self.time_stamp))
|
||||
.expect("Error sending start success");
|
||||
|
||||
let mut pool = self
|
||||
.tc_source
|
||||
.tc_store
|
||||
.pool
|
||||
.write()
|
||||
.expect("Locking pool failed");
|
||||
|
||||
let mut scheduler = self.scheduler.borrow_mut();
|
||||
scheduler
|
||||
.reset(pool.as_mut())
|
||||
.expect("Error resetting TC Pool");
|
||||
drop(scheduler);
|
||||
|
||||
self.verif_reporter
|
||||
.completion_success(start_token, Some(&self.time_stamp))
|
||||
.expect("Error sending completion success");
|
||||
}
|
||||
4 => {
|
||||
let start_token = self
|
||||
.verif_reporter
|
||||
.start_success(token, Some(&self.time_stamp))
|
||||
.expect("Error sending start success");
|
||||
|
||||
let mut pool = self
|
||||
.tc_source
|
||||
.tc_store
|
||||
.pool
|
||||
.write()
|
||||
.expect("Locking pool failed");
|
||||
let mut scheduler = self.scheduler.borrow_mut();
|
||||
scheduler
|
||||
.insert_wrapped_tc::<TimeProvider>(pus_tc, pool.as_mut())
|
||||
.expect("TODO: panic message");
|
||||
drop(scheduler);
|
||||
|
||||
self.verif_reporter
|
||||
.completion_success(start_token, Some(&self.time_stamp))
|
||||
.expect("Error sending completion success");
|
||||
}
|
||||
_ => {
|
||||
self.verif_reporter
|
||||
.start_failure(
|
||||
token,
|
||||
FailParams::new(
|
||||
Some(&self.time_stamp),
|
||||
lkoester marked this conversation as resolved
muellerr
commented
maybe this can be deleted as well? maybe this can be deleted as well?
|
||||
&tmtc_err::NOT_ENOUGH_APP_DATA,
|
||||
None,
|
||||
),
|
||||
)
|
||||
.expect("Sending start failure TM failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,12 @@
|
||||
use satrs_core::events::EventU32;
|
||||
use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer};
|
||||
use satrs_core::params::Params;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::net::SocketAddr;
|
||||
use std::rc::Rc;
|
||||
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
@ -14,6 +16,7 @@ use crate::pus::PusReceiver;
|
||||
use crate::requests::RequestWithToken;
|
||||
use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
|
||||
use satrs_core::pus::event_man::EventRequestWithToken;
|
||||
use satrs_core::pus::scheduling::PusScheduler;
|
||||
use satrs_core::pus::verification::StdVerifReporterWithSender;
|
||||
use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader};
|
||||
use satrs_core::tmtc::{
|
||||
@ -41,6 +44,12 @@ pub struct TcArgs {
|
||||
pub tc_receiver: Receiver<StoreAddr>,
|
||||
}
|
||||
|
||||
impl TcArgs {
|
||||
fn split(self) -> (PusTcSource, Receiver<StoreAddr>) {
|
||||
(self.tc_source, self.tc_receiver)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum MpscStoreAndSendError {
|
||||
StoreError(StoreError),
|
||||
@ -152,6 +161,11 @@ impl ReceivesCcsdsTc for PusTcSource {
|
||||
}
|
||||
}
|
||||
pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
|
||||
let scheduler = Rc::new(RefCell::new(
|
||||
PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(),
|
||||
));
|
||||
|
||||
let sched_clone = scheduler.clone();
|
||||
let mut pus_receiver = PusReceiver::new(
|
||||
PUS_APID,
|
||||
tm_args.tm_sink_sender,
|
||||
@ -160,11 +174,15 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
|
||||
tc_args.tc_source.clone(),
|
||||
args.event_request_tx,
|
||||
args.request_map,
|
||||
sched_clone,
|
||||
);
|
||||
|
||||
let ccsds_receiver = CcsdsReceiver {
|
||||
tc_source: tc_args.tc_source.clone(),
|
||||
};
|
||||
|
||||
let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
|
||||
|
||||
let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor))
|
||||
.expect("Creating UDP TMTC server failed");
|
||||
|
||||
@ -173,8 +191,17 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
|
||||
tm_rx: tm_args.tm_server_rx,
|
||||
tm_store: tm_args.tm_store.pool.clone(),
|
||||
};
|
||||
|
||||
let mut tc_buf: [u8; 4096] = [0; 4096];
|
||||
loop {
|
||||
core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver);
|
||||
let tmtc_sched = scheduler.clone();
|
||||
core_tmtc_loop(
|
||||
&mut udp_tmtc_server,
|
||||
&mut tc_args,
|
||||
&mut tc_buf,
|
||||
&mut pus_receiver,
|
||||
tmtc_sched,
|
||||
);
|
||||
thread::sleep(Duration::from_millis(400));
|
||||
lkoester marked this conversation as resolved
Outdated
muellerr
commented
old code can be deleted old code can be deleted
|
||||
}
|
||||
}
|
||||
@ -182,8 +209,34 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
|
||||
fn core_tmtc_loop(
|
||||
udp_tmtc_server: &mut UdpTmtcServer,
|
||||
tc_args: &mut TcArgs,
|
||||
tc_buf: &mut [u8],
|
||||
pus_receiver: &mut PusReceiver,
|
||||
scheduler: Rc<RefCell<PusScheduler>>,
|
||||
) {
|
||||
let releaser = |enabled: bool, addr: &StoreAddr| -> bool {
|
||||
tc_args.tc_source.tc_source.send(*addr).is_ok()
|
||||
};
|
||||
lkoester marked this conversation as resolved
Outdated
muellerr
commented
unused code can be deleted unused code can be deleted
|
||||
|
||||
let mut pool = tc_args
|
||||
.tc_source
|
||||
.tc_store
|
||||
.pool
|
||||
.write()
|
||||
.expect("error locking pool");
|
||||
|
||||
let mut scheduler = scheduler.borrow_mut();
|
||||
scheduler.update_time_from_now().unwrap();
|
||||
match scheduler.release_telecommands(releaser, pool.as_mut()) {
|
||||
Ok(released_tcs) => {
|
||||
if released_tcs > 0 {
|
||||
println!("{} Tc(s) released from scheduler", released_tcs);
|
||||
}
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
drop(pool);
|
||||
drop(scheduler);
|
||||
|
||||
while poll_tc_server(udp_tmtc_server) {}
|
||||
match tc_args.tc_receiver.try_recv() {
|
||||
Ok(addr) => {
|
||||
@ -194,7 +247,9 @@ fn core_tmtc_loop(
|
||||
.read()
|
||||
lkoester marked this conversation as resolved
Outdated
muellerr
commented
can be deleted can be deleted
|
||||
.expect("locking tc pool failed");
|
||||
let data = pool.read(&addr).expect("reading pool failed");
|
||||
match PusTc::from_bytes(data) {
|
||||
tc_buf[0..data.len()].copy_from_slice(data);
|
||||
drop(pool);
|
||||
match PusTc::from_bytes(tc_buf) {
|
||||
Ok((pus_tc, _)) => {
|
||||
pus_receiver
|
||||
.handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc)
|
||||
@ -202,7 +257,7 @@ fn core_tmtc_loop(
|
||||
}
|
||||
Err(e) => {
|
||||
println!("error creating PUS TC from raw data: {e}");
|
||||
println!("raw data: {data:x?}");
|
||||
println!("raw data: {tc_buf:x?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user
should be removed (or put behind std feature and `#[allow(unused_imports)]