Implementation of scheduler in pus and tmtc handler #29

Merged
muellerr merged 13 commits from pus_schedule_implementation into main 2023-02-01 13:40:49 +01:00
5 changed files with 702 additions and 71 deletions

View File

@ -3,12 +3,96 @@ use crate::pool::{PoolProvider, StoreAddr, StoreError};
use alloc::collections::btree_map::{Entry, Range};
use alloc::vec;
use alloc::vec::Vec;
use core::fmt::{Debug, Display, Formatter};
use core::time::Duration;
use spacepackets::time::UnixTimestamp;
use spacepackets::ecss::{PusError, PusPacket};
use spacepackets::tc::PusTc;
use spacepackets::time::cds::DaysLen24Bits;
use spacepackets::time::{CcsdsTimeProvider, TimeReader, TimestampError, UnixTimestamp};
use std::collections::BTreeMap;
#[cfg(feature = "std")]
use std::error::Error;
#[cfg(feature = "std")]
use std::time::SystemTimeError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScheduleError {
PusError(PusError),
TimeMarginTooShort(UnixTimestamp, UnixTimestamp),
NestedScheduledTc,
StoreError(StoreError),
TcDataEmpty,
TimestampError(TimestampError),
WrongSubservice,
WrongService,
}
impl Display for ScheduleError {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
ScheduleError::PusError(e) => {
write!(f, "Pus Error: {}", e)
}
ScheduleError::TimeMarginTooShort(current_time, timestamp) => {
write!(
f,
"Error: time margin too short, current time: {:?}, time stamp: {:?}",
current_time, timestamp
)
}
ScheduleError::NestedScheduledTc => {
write!(f, "Error: nested scheduling is not allowed")
}
ScheduleError::StoreError(e) => {
write!(f, "Store Error: {}", e)
}
ScheduleError::TcDataEmpty => {
write!(f, "Error: empty Tc Data field")
}
ScheduleError::TimestampError(e) => {
write!(f, "Timestamp Error: {}", e)
}
ScheduleError::WrongService => {
write!(f, "Error: Service not 11.")
}
ScheduleError::WrongSubservice => {
write!(f, "Error: Subservice not 4.")
}
}
}
}
impl From<PusError> for ScheduleError {
fn from(e: PusError) -> Self {
ScheduleError::PusError(e)
}
}
impl From<StoreError> for ScheduleError {
fn from(e: StoreError) -> Self {
ScheduleError::StoreError(e)
}
}
impl From<TimestampError> for ScheduleError {
fn from(e: TimestampError) -> Self {
ScheduleError::TimestampError(e)
}
}
#[cfg(feature = "std")]
impl Error for ScheduleError {}
//TODO: Move to spacepackets
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum ScheduleSubservice {
EnableScheduling = 1,
DisableScheduling = 2,
ResetScheduling = 3,
InsertActivity = 4,
DeleteActivity = 5,
}
/// This is the core data structure for scheduling PUS telecommands with [alloc] support.
///
/// It is assumed that the actual telecommand data is stored in a separate TC pool offering
@ -82,7 +166,8 @@ impl PusScheduler {
/// The holding store for the telecommands needs to be passed so all the stored telecommands
/// can be deleted to avoid a memory leak. If at last one deletion operation fails, the error
/// will be returned but the method will still try to delete all the commands in the schedule.
pub fn reset(&mut self, store: &mut impl PoolProvider) -> Result<(), StoreError> {
pub fn reset(&mut self, store: &mut (impl PoolProvider + ?Sized)) -> Result<(), StoreError> {
self.enabled = false;
let mut deletion_ok = Ok(());
for tc_lists in &mut self.tc_map {
@ -105,9 +190,16 @@ impl PusScheduler {
&self.current_time
}
pub fn insert_tc(&mut self, time_stamp: UnixTimestamp, addr: StoreAddr) -> bool {
pub fn insert_unwrapped_and_stored_tc(
&mut self,
time_stamp: UnixTimestamp,
addr: StoreAddr,
) -> Result<(), ScheduleError> {
if time_stamp < self.current_time + self.time_margin {
return false;
return Err(ScheduleError::TimeMarginTooShort(
self.current_time,
time_stamp,
));
}
match self.tc_map.entry(time_stamp) {
Entry::Vacant(e) => {
@ -117,7 +209,65 @@ impl PusScheduler {
v.get_mut().push(addr);
}
}
true
Ok(())
}
pub fn insert_unwrapped_tc(
&mut self,
time_stamp: UnixTimestamp,
tc: &[u8],
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<StoreAddr, ScheduleError> {
let check_tc = PusTc::from_bytes(tc)?;
if PusPacket::service(&check_tc.0) == 11 && PusPacket::subservice(&check_tc.0) == 4 {
return Err(ScheduleError::NestedScheduledTc);
}
match pool.add(tc) {
Ok(addr) => {
self.insert_unwrapped_and_stored_tc(time_stamp, addr)?;
Ok(addr)
}
Err(err) => Err(err.into()),
}
}
// <T: FnMut(&[u8]) -> (&dyn CcsdsTimeProvider)>
pub fn insert_wrapped_tc<TimeStamp: CcsdsTimeProvider + TimeReader>(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<StoreAddr, ScheduleError> {
if PusPacket::service(pus_tc) != 11 {
return Err(ScheduleError::WrongService);
}
if PusPacket::subservice(pus_tc) != 4 {
return Err(ScheduleError::WrongSubservice);
}
return if let Some(user_data) = pus_tc.user_data() {
let stamp: TimeStamp = TimeReader::from_bytes(user_data)?;
let unix_stamp = stamp.unix_stamp();
let stamp_len = stamp.len_as_bytes();
self.insert_unwrapped_tc(unix_stamp, &user_data[stamp_len..], pool)
} else {
Err(ScheduleError::TcDataEmpty)
};
}
pub fn insert_wrapped_tc_cds_short(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<StoreAddr, ScheduleError> {
self.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(pus_tc, pool)
}
pub fn insert_wrapped_tc_cds_long(
&mut self,
pus_tc: &PusTc,
pool: &mut (impl PoolProvider + ?Sized),
) -> Result<StoreAddr, ScheduleError> {
self.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider<DaysLen24Bits>>(pus_tc, pool)
}
pub fn telecommands_to_release(&self) -> Range<'_, UnixTimestamp, Vec<StoreAddr>> {
@ -133,18 +283,19 @@ impl PusScheduler {
/// Utility method which calls [Self::telecommands_to_release] and then calls a releaser
/// closure for each telecommand which should be released. This function will also delete
/// the telecommands from the holding store after calling the release closure.
/// the telecommands from the holding store after calling the release closure, if the scheduler
/// is disabled.
///
/// # Arguments
///
/// * `releaser` - Closure where the first argument is whether the scheduler is enabled and
/// the second argument is the store address. This closure should return whether the
/// command should be deleted.
/// command should be deleted if the scheduler is disabled to prevent memory leaks.
/// * `store` - The holding store of the telecommands.
pub fn release_telecommands<R: FnMut(bool, &StoreAddr) -> bool>(
&mut self,
mut releaser: R,
tc_store: &mut impl PoolProvider,
tc_store: &mut (impl PoolProvider + ?Sized),
) -> Result<u64, (u64, StoreError)> {
let tcs_to_release = self.telecommands_to_release();
let mut released_tcs = 0;
@ -153,7 +304,7 @@ impl PusScheduler {
for addr in tc.1 {
let should_delete = releaser(self.enabled, addr);
released_tcs += 1;
if should_delete {
if should_delete && !self.is_enabled() {
let res = tc_store.delete(*addr);
if res.is_err() {
store_error = res;
@ -171,10 +322,20 @@ impl PusScheduler {
#[cfg(test)]
mod tests {
use crate::pool::{LocalPool, PoolCfg, PoolProvider, StoreAddr};
use crate::pus::scheduling::PusScheduler;
use alloc::vec::Vec;
use spacepackets::time::UnixTimestamp;
use crate::pus::scheduling::{PusScheduler, ScheduleError};
use crate::tmtc::ccsds_distrib::tests::generate_ping_tc;
use alloc::rc::Rc;
use core::borrow::BorrowMut;
use spacepackets::ecss::PacketTypeCodes::UnsignedInt;
use spacepackets::ecss::PusPacket;
use spacepackets::tc::PusTc;
use spacepackets::time::{cds, TimeWriter, UnixTimestamp};
use spacepackets::{CcsdsPacket, SpHeader};
use std::cell::RefCell;
use std::sync::mpsc;
use std::sync::mpsc::{channel, Receiver, TryRecvError};
use std::time::Duration;
use std::vec::Vec;
#[allow(unused_imports)]
use std::{println, vec};
@ -194,19 +355,29 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let first_addr = pool.add(&[0, 1, 2]).unwrap();
let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr.clone());
assert!(worked);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
first_addr.clone(),
)
.unwrap();
let second_addr = pool.add(&[2, 3, 4]).unwrap();
let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(200), second_addr.clone());
assert!(worked);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(200),
second_addr.clone(),
)
.unwrap();
let third_addr = pool.add(&[5, 6, 7]).unwrap();
let worked = scheduler.insert_tc(UnixTimestamp::new_only_seconds(300), third_addr.clone());
assert!(worked);
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(300),
third_addr.clone(),
)
.unwrap();
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
assert!(scheduler.is_enabled());
@ -223,35 +394,35 @@ mod tests {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let worked = scheduler.insert_tc(
scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
StoreAddr {
pool_idx: 0,
packet_idx: 1,
},
);
)
.unwrap();
assert!(worked);
let worked = scheduler.insert_tc(
let worked = scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(100),
StoreAddr {
pool_idx: 0,
packet_idx: 2,
},
);
)
.unwrap();
assert!(worked);
let worked = scheduler.insert_tc(
let worked = scheduler
.insert_unwrapped_and_stored_tc(
UnixTimestamp::new_only_seconds(300),
StoreAddr {
pool_idx: 0,
packet_idx: 2,
},
);
assert!(worked);
)
.unwrap();
assert_eq!(scheduler.num_scheduled_telecommands(), 3);
}
@ -275,6 +446,17 @@ mod tests {
assert!(expected_store_addrs.contains(store_addr));
*counter += 1;
}
fn common_check_disabled(
enabled: bool,
store_addr: &StoreAddr,
expected_store_addrs: Vec<StoreAddr>,
counter: &mut usize,
) {
assert_eq!(enabled, false);
assert!(expected_store_addrs.contains(store_addr));
*counter += 1;
}
#[test]
fn release_basic() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
@ -282,10 +464,11 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let first_addr = pool.add(&[2, 2, 2]).unwrap();
scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr);
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
let second_addr = pool.add(&[5, 6, 7]).unwrap();
scheduler.insert_tc(UnixTimestamp::new_only_seconds(200), second_addr);
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr);
let mut i = 0;
let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| {
@ -307,7 +490,7 @@ mod tests {
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&first_addr).unwrap());
assert!(pool.has_element_at(&first_addr).unwrap());
// test 3, late timestamp, release 1 overdue tc
let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| {
@ -321,7 +504,7 @@ mod tests {
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&second_addr).unwrap());
assert!(pool.has_element_at(&second_addr).unwrap());
//test 4: no tcs left
scheduler
@ -339,10 +522,11 @@ mod tests {
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let first_addr = pool.add(&[2, 2, 2]).unwrap();
scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), first_addr);
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
let second_addr = pool.add(&[2, 2, 2]).unwrap();
scheduler.insert_tc(UnixTimestamp::new_only_seconds(100), second_addr);
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), second_addr);
let mut i = 0;
let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| {
@ -365,8 +549,8 @@ mod tests {
.release_telecommands(&mut test_closure, &mut pool)
.expect("deletion failed");
assert_eq!(released, 2);
assert!(!pool.has_element_at(&first_addr).unwrap());
assert!(!pool.has_element_at(&second_addr).unwrap());
assert!(pool.has_element_at(&first_addr).unwrap());
assert!(pool.has_element_at(&second_addr).unwrap());
//test 3: no tcs left
released = scheduler
@ -377,4 +561,261 @@ mod tests {
// check that 2 total tcs have been released
assert_eq!(i, 2);
}
#[test]
fn release_with_scheduler_disabled() {
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
scheduler.disable();
let first_addr = pool.add(&[2, 2, 2]).unwrap();
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(100), first_addr);
let second_addr = pool.add(&[5, 6, 7]).unwrap();
scheduler.insert_unwrapped_and_stored_tc(UnixTimestamp::new_only_seconds(200), second_addr);
let mut i = 0;
let mut test_closure_1 = |boolvar: bool, store_addr: &StoreAddr| {
common_check_disabled(boolvar, store_addr, vec![first_addr], &mut i);
true
};
// test 1: too early, no tcs
scheduler.update_time(UnixTimestamp::new_only_seconds(99));
scheduler
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
// test 2: exact time stamp of tc, releases 1 tc
scheduler.update_time(UnixTimestamp::new_only_seconds(100));
let mut released = scheduler
.release_telecommands(&mut test_closure_1, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&first_addr).unwrap());
// test 3, late timestamp, release 1 overdue tc
let mut test_closure_2 = |boolvar: bool, store_addr: &StoreAddr| {
common_check_disabled(boolvar, store_addr, vec![second_addr], &mut i);
true
};
scheduler.update_time(UnixTimestamp::new_only_seconds(206));
released = scheduler
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
assert_eq!(released, 1);
assert!(!pool.has_element_at(&second_addr).unwrap());
//test 4: no tcs left
scheduler
.release_telecommands(&mut test_closure_2, &mut pool)
.expect("deletion failed");
// check that 2 total tcs have been released
assert_eq!(i, 2);
}
fn scheduled_tc(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(&timestamp).unwrap();
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
let len_packet = base_ping_tc_simple_ctor()
.write_to_bytes(&mut buf[len_time_stamp..])
.unwrap();
let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap();
PusTc::new_simple(
&mut sph,
11,
4,
Some(&buf[..len_packet + len_time_stamp]),
true,
)
}
fn wrong_tc_service(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(&timestamp).unwrap();
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
let len_packet = base_ping_tc_simple_ctor()
.write_to_bytes(&mut buf[len_time_stamp..])
.unwrap();
let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap();
PusTc::new_simple(
&mut sph,
12,
4,
Some(&buf[..len_packet + len_time_stamp]),
true,
)
}
fn wrong_tc_subservice(timestamp: UnixTimestamp, buf: &mut [u8]) -> PusTc {
let cds_time = cds::TimeProvider::from_unix_secs_with_u16_days(&timestamp).unwrap();
let len_time_stamp = cds_time.write_to_bytes(buf).unwrap();
let len_packet = base_ping_tc_simple_ctor()
.write_to_bytes(&mut buf[len_time_stamp..])
.unwrap();
let mut sph = SpHeader::tc_unseg(0x02, 0x34, len_packet as u16).unwrap();
PusTc::new_simple(
&mut sph,
11,
5,
Some(&buf[..len_packet + len_time_stamp]),
true,
)
}
fn base_ping_tc_simple_ctor() -> PusTc<'static> {
let mut sph = SpHeader::tc_unseg(0x02, 0x34, 0).unwrap();
PusTc::new_simple(&mut sph, 17, 1, None, true)
}
#[test]
fn insert_unwrapped_tc() {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
let len = base_ping_tc_simple_ctor().write_to_bytes(&mut buf).unwrap();
let addr = scheduler
.insert_unwrapped_tc(UnixTimestamp::new_only_seconds(100), &buf[..len], &mut pool)
.unwrap();
assert!(pool.has_element_at(&addr).unwrap());
let data = pool.read(&addr).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
scheduler.update_time(UnixTimestamp::new_only_seconds(101));
let mut addr_vec = vec::Vec::new();
let mut i = 0;
let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| {
common_check(boolvar, store_addr, vec![addr], &mut i);
// check that tc remains unchanged
addr_vec.push(*store_addr);
false
};
scheduler
.release_telecommands(&mut test_closure, &mut pool)
.unwrap();
let data = pool.read(&addr_vec[0]).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
}
#[test]
fn insert_wrapped_tc() {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
let tc = scheduled_tc(UnixTimestamp::new_only_seconds(100), &mut buf);
let addr = match scheduler
.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(&tc, &mut pool)
{
Ok(addr) => addr,
Err(e) => {
println!("{}", e);
panic!();
}
};
assert!(pool.has_element_at(&addr).unwrap());
let data = pool.read(&addr).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
assert_eq!(scheduler.num_scheduled_telecommands(), 1);
scheduler.update_time(UnixTimestamp::new_only_seconds(101));
let mut addr_vec = vec::Vec::new();
let mut i = 0;
let mut test_closure = |boolvar: bool, store_addr: &StoreAddr| {
common_check(boolvar, store_addr, vec![addr], &mut i);
// check that tc remains unchanged
addr_vec.push(*store_addr);
false
};
scheduler
.release_telecommands(&mut test_closure, &mut pool)
.unwrap();
let data = pool.read(&addr_vec[0]).unwrap();
let check_tc = PusTc::from_bytes(&data).expect("incorrect Pus tc raw data");
assert_eq!(check_tc.0, base_ping_tc_simple_ctor());
}
#[test]
fn insert_wrong_service() {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
let tc = wrong_tc_service(UnixTimestamp::new_only_seconds(100), &mut buf);
match scheduler.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(&tc, &mut pool) {
Ok(_) => {
panic!();
}
Err(e) => {
if e != ScheduleError::WrongService {
panic!();
}
}
}
}
#[test]
fn insert_wrong_subservice() {
let mut scheduler =
PusScheduler::new(UnixTimestamp::new_only_seconds(0), Duration::from_secs(5));
let mut pool = LocalPool::new(PoolCfg::new(vec![(10, 32), (5, 64)]));
let mut buf: [u8; 32] = [0; 32];
let tc = wrong_tc_subservice(UnixTimestamp::new_only_seconds(100), &mut buf);
match scheduler.insert_wrapped_tc::<spacepackets::time::cds::TimeProvider>(&tc, &mut pool) {
Ok(_) => {
panic!();
}
Err(e) => {
if e != ScheduleError::WrongSubservice {
panic!();
}
}
}
}
}

View File

@ -5,18 +5,21 @@ import struct
import sys
import time
from typing import Optional
import datetime
import tmtccmd
from spacepackets.ecss import PusTelemetry, PusTelecommand, PusVerificator
from spacepackets.ecss.pus_17_test import Service17Tm
from spacepackets.ecss.pus_1_verification import UnpackParams, Service1Tm
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd import CcsdsTmtcBackend, TcHandlerBase, ProcedureParamsWrapper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_11_tc_sched import create_time_tagged_cmd
from tmtccmd.core.base import BackendRequest
from tmtccmd.pus import VerificationWrapper
from tmtccmd.tm import CcsdsTmHandler, SpecificApidHandlerBase
from tmtccmd.com_if import ComInterface
from tmtccmd.com import ComInterface
from tmtccmd.config import (
default_json_path,
SetupParams,
@ -41,7 +44,7 @@ from tmtccmd.tc import (
SendCbParams,
DefaultPusQueueHelper,
)
from tmtccmd.tm.pus_5_event import Service5Tm
from tmtccmd.tm.pus_5_fsfw_event import Service5Tm
from tmtccmd.util import FileSeqCountProvider, PusFileSeqCountProvider
from tmtccmd.util.obj_id import ObjectIdDictT
@ -57,7 +60,7 @@ class SatRsConfigHook(TmTcCfgHookBase):
super().__init__(json_cfg_path=json_cfg_path)
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com_if import (
from tmtccmd.config.com import (
create_com_interface_default,
create_com_interface_cfg_default,
)
@ -94,6 +97,13 @@ class SatRsConfigHook(TmTcCfgHookBase):
info="PUS Service 3 Housekeeping",
op_code_entry=srv_3
)
srv_11 = OpCodeEntry()
srv_11.add("0", "Scheduled TC Test")
defs.add_service(
name=CoreServiceList.SERVICE_11,
info="PUS Service 11 TC Scheduling",
op_code_entry=srv_11,
)
return defs
def perform_mode_operation(self, tmtc_backend: CcsdsTmtcBackend, mode: int):
@ -120,7 +130,7 @@ class PusHandler(SpecificApidHandlerBase):
def handle_tm(self, packet: bytes, _user_args: any):
try:
tm_packet = PusTelemetry.unpack(packet)
tm_packet = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
except ValueError as e:
LOGGER.warning("Could not generate PUS TM object from raw data")
LOGGER.warning(f"Raw Packet: [{packet.hex(sep=',')}], REPR: {packet!r}")
@ -128,7 +138,7 @@ class PusHandler(SpecificApidHandlerBase):
service = tm_packet.service
dedicated_handler = False
if service == 1:
tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(1, 2))
tm_packet = Service1Tm.unpack(data=packet, params=UnpackParams(CdsShortTimestamp.empty(), 1, 2))
res = self.verif_wrapper.add_tm(tm_packet)
if res is None:
LOGGER.info(
@ -145,16 +155,16 @@ class PusHandler(SpecificApidHandlerBase):
if service == 3:
LOGGER.info("No handling for HK packets implemented")
LOGGER.info(f"Raw packet: 0x[{packet.hex(sep=',')}]")
pus_tm = PusTelemetry.unpack(packet)
pus_tm = PusTelemetry.unpack(packet, time_reader=CdsShortTimestamp.empty())
if pus_tm.subservice == 25:
if len(pus_tm.source_data) < 8:
raise ValueError("No addressable ID in HK packet")
json_str = pus_tm.source_data[8:]
dedicated_handler = True
if service == 5:
tm_packet = Service5Tm.unpack(packet)
tm_packet = Service5Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
if service == 17:
tm_packet = Service17Tm.unpack(packet)
tm_packet = Service17Tm.unpack(packet, time_reader=CdsShortTimestamp.empty())
dedicated_handler = True
if tm_packet.subservice == 2:
self.printer.file_logger.info("Received Ping Reply TM[17,2]")
@ -205,7 +215,10 @@ class TcHandler(TcHandlerBase):
self.verif_wrapper = verif_wrapper
self.queue_helper = DefaultPusQueueHelper(
queue_wrapper=None,
tc_sched_timestamp_len=CdsShortTimestamp.TIMESTAMP_SIZE,
seq_cnt_provider=seq_count_provider,
pus_verificator=self.verif_wrapper.pus_verificator,
default_pus_apid=EXAMPLE_PUS_APID
)
def send_cb(self, send_params: SendCbParams):
@ -213,10 +226,6 @@ class TcHandler(TcHandlerBase):
if entry_helper.is_tc:
if entry_helper.entry_type == TcQueueEntryType.PUS_TC:
pus_tc_wrapper = entry_helper.to_pus_tc_entry()
pus_tc_wrapper.pus_tc.seq_count = (
self.seq_count_provider.get_and_increment()
)
self.verif_wrapper.add_tc(pus_tc_wrapper.pus_tc)
raw_tc = pus_tc_wrapper.pus_tc.pack()
LOGGER.info(f"Sending {pus_tc_wrapper.pus_tc}")
send_params.com_if.send(raw_tc)
@ -247,6 +256,14 @@ class TcHandler(TcHandlerBase):
return q.add_pus_tc(
PusTelecommand(service=17, subservice=1)
)
if service == CoreServiceList.SERVICE_11:
q.add_log_cmd("Sending PUS scheduled TC telecommand")
crt_time = CdsShortTimestamp.from_now()
time_stamp = crt_time + datetime.timedelta(seconds=10)
time_stamp = time_stamp.pack()
return q.add_pus_tc(
create_time_tagged_cmd(time_stamp, PusTelecommand(service=17, subservice=1), apid=EXAMPLE_PUS_APID)
)
if service == CoreServiceList.SERVICE_3:
if op_code in HkOpCodes.GENERATE_ONE_SHOT:
q.add_log_cmd("Sending HK one shot request")

View File

@ -1 +1,2 @@
tmtccmd == 3.0.0
# tmtccmd == 4.0.0a2
-e git+https://github.com/robamu-org/tmtccmd@97e5e51101a08b21472b3ddecc2063359f7e307a#egg=tmtccmd

View File

@ -6,6 +6,7 @@ use satrs_core::pool::StoreAddr;
use satrs_core::pus::event::Subservices;
use satrs_core::pus::event_man::{EventRequest, EventRequestWithToken};
use satrs_core::pus::hk;
use satrs_core::pus::scheduling::PusScheduler;
use satrs_core::pus::verification::{
FailParams, StdVerifReporterWithSender, TcStateAccepted, VerificationToken,
};
@ -17,7 +18,9 @@ use satrs_core::{
spacepackets::time::TimeWriter, spacepackets::SpHeader,
};
use satrs_example::{hk_err, tmtc_err};
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::mpsc::Sender;
pub struct PusReceiver {
@ -31,6 +34,7 @@ pub struct PusReceiver {
request_map: HashMap<u32, Sender<RequestWithToken>>,
stamper: TimeProvider,
time_stamp: [u8; 7],
scheduler: Rc<RefCell<PusScheduler>>,
}
impl PusReceiver {
@ -42,6 +46,7 @@ impl PusReceiver {
tc_source: PusTcSource,
event_request_tx: Sender<EventRequestWithToken>,
request_map: HashMap<u32, Sender<RequestWithToken>>,
scheduler: Rc<RefCell<PusScheduler>>,
) -> Self {
Self {
tm_helper: PusTmWithCdsShortHelper::new(apid),
@ -53,6 +58,7 @@ impl PusReceiver {
request_map,
stamper: TimeProvider::new_with_u16_days(0, 0),
time_stamp: [0; 7],
scheduler,
}
}
}
@ -78,6 +84,8 @@ impl PusServiceProvider for PusReceiver {
self.handle_event_request(pus_tc, accepted_token);
} else if service == 3 {
self.handle_hk_request(pus_tc, accepted_token);
} else if service == 11 {
self.handle_scheduled_tc(pus_tc, accepted_token);
} else {
self.update_time_stamp();
self.verif_reporter
@ -201,6 +209,7 @@ impl PusReceiver {
));
}
}
fn handle_event_request(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
let send_start_failure = |verif_reporter: &mut StdVerifReporterWithSender,
timestamp: &[u8; 7],
@ -273,4 +282,112 @@ impl PusReceiver {
}
}
}
fn handle_scheduled_tc(&mut self, pus_tc: &PusTc, token: VerificationToken<TcStateAccepted>) {
if pus_tc.user_data().is_none() {
self.update_time_stamp();
self.verif_reporter
.start_failure(
token,
FailParams::new(Some(&self.time_stamp), &tmtc_err::NOT_ENOUGH_APP_DATA, None),
)
.expect("Sending start failure TM failed");
return;
}
self.update_time_stamp();
match pus_tc.subservice() {
1 => {
let start_token = self
.verif_reporter
.start_success(token, Some(&self.time_stamp))
.expect("Error sending start success");
let mut scheduler = self.scheduler.borrow_mut();
scheduler.enable();
if scheduler.is_enabled() {
self.verif_reporter
.completion_success(start_token, Some(&self.time_stamp))
.expect("Error sending completion success");
} else {
panic!("Failed to enable scheduler");
}
drop(scheduler);
}
2 => {
let start_token = self
.verif_reporter
.start_success(token, Some(&self.time_stamp))
.expect("Error sending start success");
let mut scheduler = self.scheduler.borrow_mut();
scheduler.disable();
if !scheduler.is_enabled() {
self.verif_reporter
.completion_success(start_token, Some(&self.time_stamp))
.expect("Error sending completion success");
} else {
panic!("Failed to disable scheduler");
}
drop(scheduler);
}
3 => {
let start_token = self
.verif_reporter
.start_success(token, Some(&self.time_stamp))
.expect("Error sending start success");
let mut pool = self
.tc_source
.tc_store
.pool
.write()
.expect("Locking pool failed");
let mut scheduler = self.scheduler.borrow_mut();
scheduler
.reset(pool.as_mut())
.expect("Error resetting TC Pool");
drop(scheduler);
self.verif_reporter
.completion_success(start_token, Some(&self.time_stamp))
.expect("Error sending completion success");
}
4 => {
let start_token = self
.verif_reporter
.start_success(token, Some(&self.time_stamp))
.expect("Error sending start success");
let mut pool = self
.tc_source
.tc_store
.pool
.write()
.expect("Locking pool failed");
let mut scheduler = self.scheduler.borrow_mut();
scheduler
.insert_wrapped_tc::<TimeProvider>(pus_tc, pool.as_mut())
.expect("TODO: panic message");
drop(scheduler);
self.verif_reporter
.completion_success(start_token, Some(&self.time_stamp))
.expect("Error sending completion success");
}
_ => {
self.verif_reporter
.start_failure(
token,
FailParams::new(
Some(&self.time_stamp),
lkoester marked this conversation as resolved
Review

maybe this can be deleted as well?

maybe this can be deleted as well?
&tmtc_err::NOT_ENOUGH_APP_DATA,
None,
),
)
.expect("Sending start failure TM failed");
}
}
}
}

View File

@ -1,10 +1,12 @@
use satrs_core::events::EventU32;
use satrs_core::hal::host::udp_server::{ReceiveResult, UdpTcServer};
use satrs_core::params::Params;
use std::cell::RefCell;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::mpsc::{Receiver, SendError, Sender, TryRecvError};
use std::thread;
use std::time::Duration;
@ -14,6 +16,7 @@ use crate::pus::PusReceiver;
use crate::requests::RequestWithToken;
use satrs_core::pool::{SharedPool, StoreAddr, StoreError};
use satrs_core::pus::event_man::EventRequestWithToken;
use satrs_core::pus::scheduling::PusScheduler;
use satrs_core::pus::verification::StdVerifReporterWithSender;
use satrs_core::spacepackets::{ecss::PusPacket, tc::PusTc, tm::PusTm, SpHeader};
use satrs_core::tmtc::{
@ -41,6 +44,12 @@ pub struct TcArgs {
pub tc_receiver: Receiver<StoreAddr>,
}
impl TcArgs {
fn split(self) -> (PusTcSource, Receiver<StoreAddr>) {
(self.tc_source, self.tc_receiver)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MpscStoreAndSendError {
StoreError(StoreError),
@ -152,6 +161,11 @@ impl ReceivesCcsdsTc for PusTcSource {
}
}
pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
let scheduler = Rc::new(RefCell::new(
PusScheduler::new_with_current_init_time(Duration::from_secs(5)).unwrap(),
));
let sched_clone = scheduler.clone();
let mut pus_receiver = PusReceiver::new(
PUS_APID,
tm_args.tm_sink_sender,
@ -160,11 +174,15 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
tc_args.tc_source.clone(),
args.event_request_tx,
args.request_map,
sched_clone,
);
let ccsds_receiver = CcsdsReceiver {
tc_source: tc_args.tc_source.clone(),
};
let ccsds_distributor = CcsdsDistributor::new(Box::new(ccsds_receiver));
let udp_tc_server = UdpTcServer::new(args.sock_addr, 2048, Box::new(ccsds_distributor))
.expect("Creating UDP TMTC server failed");
@ -173,8 +191,17 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
tm_rx: tm_args.tm_server_rx,
tm_store: tm_args.tm_store.pool.clone(),
};
let mut tc_buf: [u8; 4096] = [0; 4096];
loop {
core_tmtc_loop(&mut udp_tmtc_server, &mut tc_args, &mut pus_receiver);
let tmtc_sched = scheduler.clone();
core_tmtc_loop(
&mut udp_tmtc_server,
&mut tc_args,
&mut tc_buf,
&mut pus_receiver,
tmtc_sched,
);
thread::sleep(Duration::from_millis(400));
}
}
@ -182,8 +209,34 @@ pub fn core_tmtc_task(args: OtherArgs, mut tc_args: TcArgs, tm_args: TmArgs) {
fn core_tmtc_loop(
udp_tmtc_server: &mut UdpTmtcServer,
tc_args: &mut TcArgs,
tc_buf: &mut [u8],
pus_receiver: &mut PusReceiver,
scheduler: Rc<RefCell<PusScheduler>>,
) {
let releaser = |enabled: bool, addr: &StoreAddr| -> bool {
tc_args.tc_source.tc_source.send(*addr).is_ok()
};
let mut pool = tc_args
.tc_source
.tc_store
.pool
.write()
.expect("error locking pool");
let mut scheduler = scheduler.borrow_mut();
scheduler.update_time_from_now().unwrap();
match scheduler.release_telecommands(releaser, pool.as_mut()) {
Ok(released_tcs) => {
if released_tcs > 0 {
println!("{} Tc(s) released from scheduler", released_tcs);
}
}
Err(_) => {}
}
drop(pool);
drop(scheduler);
while poll_tc_server(udp_tmtc_server) {}
match tc_args.tc_receiver.try_recv() {
Ok(addr) => {
@ -194,7 +247,9 @@ fn core_tmtc_loop(
.read()
.expect("locking tc pool failed");
let data = pool.read(&addr).expect("reading pool failed");
match PusTc::from_bytes(data) {
tc_buf[0..data.len()].copy_from_slice(data);
drop(pool);
match PusTc::from_bytes(tc_buf) {
Ok((pus_tc, _)) => {
pus_receiver
.handle_pus_tc_packet(pus_tc.service(), pus_tc.sp_header(), &pus_tc)
@ -202,7 +257,7 @@ fn core_tmtc_loop(
}
Err(e) => {
println!("error creating PUS TC from raw data: {e}");
println!("raw data: {data:x?}");
println!("raw data: {tc_buf:x?}");
}
}
}