add event management, small fix for CAM handler loop #17

Merged
muellerr merged 1 commits from events-small-bugfix into main 2024-04-24 20:04:02 +02:00
11 changed files with 449 additions and 57 deletions

8
Cargo.lock generated
View File

@ -733,9 +733,9 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
[[package]] [[package]]
name = "satrs" name = "satrs"
version = "0.2.0-rc.3" version = "0.2.0-rc.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6aa9241e4d6cb0cc395927cfe653d8bc4a9cb6b2c27f28fec713d5e6ceb0ba23" checksum = "2adc1d9369e3f7e21dabb3181e36c914d1a3f68f4900207a2baa129c2fd5baba"
dependencies = [ dependencies = [
"bus", "bus",
"cobs", "cobs",
@ -783,9 +783,9 @@ dependencies = [
[[package]] [[package]]
name = "satrs-shared" name = "satrs-shared"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f01df804b902334a23c539db5e37f11bf41ecb86596292e7cc091628bf2c4f67" checksum = "6042477018c2d43fffccaaa5099bc299a58485139b4d31c5b276889311e474f1"
dependencies = [ dependencies = [
"serde", "serde",
"spacepackets", "spacepackets",

View File

@ -23,7 +23,7 @@ homedir = "0.2"
socket2 = "0.5" socket2 = "0.5"
[dependencies.satrs] [dependencies.satrs]
version = "0.2.0-rc.3" version = "0.2.0-rc.5"
# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git" # git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
# branch = "main" # branch = "main"
features = ["test_util"] features = ["test_util"]

View File

@ -1,5 +1,6 @@
use lazy_static::lazy_static; use lazy_static::lazy_static;
use num_enum::{IntoPrimitive, TryFromPrimitive}; use num_enum::{IntoPrimitive, TryFromPrimitive};
use satrs::events::{EventU32TypedSev, SeverityInfo};
use satrs::spacepackets::PacketId; use satrs::spacepackets::PacketId;
use satrs_mib::res_code::ResultU16Info; use satrs_mib::res_code::ResultU16Info;
use satrs_mib::resultcode; use satrs_mib::resultcode;
@ -39,6 +40,9 @@ pub enum GroupId {
Action = 3, Action = 3,
} }
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> =
EventU32TypedSev::<SeverityInfo>::new(GroupId::Tmtc as u16, 0);
lazy_static! { lazy_static! {
pub static ref HOME_PATH: PathBuf = { pub static ref HOME_PATH: PathBuf = {
let mut home_path = PathBuf::new(); let mut home_path = PathBuf::new();
@ -280,6 +284,7 @@ pub mod tasks {
pub const FREQ_MS_AOCS: u64 = 500; pub const FREQ_MS_AOCS: u64 = 500;
pub const FREQ_MS_PUS_STACK: u64 = 200; pub const FREQ_MS_PUS_STACK: u64 = 200;
pub const FREQ_MS_CTRL: u64 = 400; pub const FREQ_MS_CTRL: u64 = 400;
pub const FREQ_MS_CAMERA_HANDLING: u64 = 400;
pub const STOP_CHECK_FREQUENCY_MS: u64 = 400; pub const STOP_CHECK_FREQUENCY_MS: u64 = 400;
pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS); pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS);

287
src/events.rs Normal file
View File

@ -0,0 +1,287 @@
use std::sync::mpsc::{self};
use crate::pus::create_verification_reporter;
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
use satrs::event_man::{EventMessageU32, EventRoutingError};
use satrs::params::WritableToBeBytes;
use satrs::pus::event::EventTmHookProvider;
use satrs::pus::verification::VerificationReporter;
use satrs::request::UniqueApidTargetId;
use satrs::tmtc::PacketAsVec;
use satrs::{
event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded},
pus::{
event_man::{
DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken,
},
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
},
spacepackets::time::cds::CdsTime,
};
use ops_sat_rs::update_time;
// This helper sets the APID of the event sender for the PUS telemetry.
#[derive(Default)]
pub struct EventApidSetter {
pub next_apid: u16,
}
impl EventTmHookProvider for EventApidSetter {
fn modify_tm(&self, tm: &mut satrs::spacepackets::ecss::tm::PusTmCreator) {
tm.set_apid(self.next_apid);
}
}
/// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event
/// packets. It also handles the verification completion of PUS event service requests.
pub struct PusEventHandler {
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
pus_event_tm_creator: DefaultPusEventU32TmCreator<EventApidSetter>,
pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
tm_sender: mpsc::Sender<PacketAsVec>,
time_provider: CdsTime,
timestamp: [u8; 7],
verif_handler: VerificationReporter,
}
impl PusEventHandler {
pub fn new(
tm_sender: mpsc::Sender<PacketAsVec>,
verif_handler: VerificationReporter,
event_manager: &mut EventManagerWithBoundedMpsc,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self {
let event_queue_cap = 30;
let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(event_queue_cap);
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
// telemetry for each event.
let event_reporter = EventReporter::new_with_hook(
PUS_EVENT_MANAGEMENT.raw(),
0,
0,
128,
EventApidSetter::default(),
)
.unwrap();
let pus_event_dispatcher =
DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter);
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
PUS_EVENT_MANAGEMENT.raw(),
pus_event_man_tx,
event_queue_cap,
);
event_manager.subscribe_all(pus_event_man_send_provider.target_id());
event_manager.add_sender(pus_event_man_send_provider);
Self {
event_request_rx,
pus_event_tm_creator: pus_event_dispatcher,
pus_event_man_rx,
time_provider: CdsTime::new_with_u16_days(0, 0),
timestamp: [0; 7],
verif_handler,
tm_sender,
}
}
pub fn handle_event_requests(&mut self) {
let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
let started_token: VerificationToken<TcStateStarted> = event_req
.token
.try_into()
.expect("expected start verification token");
self.verif_handler
.completion_success(&self.tm_sender, started_token, timestamp)
.expect("Sending completion success failed");
};
loop {
// handle event requests
match self.event_request_rx.try_recv() {
Ok(event_req) => match event_req.request {
EventRequest::Enable(event) => {
self.pus_event_tm_creator
.enable_tm_for_event(&event)
.expect("Enabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
EventRequest::Disable(event) => {
self.pus_event_tm_creator
.disable_tm_for_event(&event)
.expect("Disabling TM failed");
update_time(&mut self.time_provider, &mut self.timestamp);
report_completion(event_req, &self.timestamp);
}
},
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("all event request senders have disconnected");
break;
}
},
}
}
}
pub fn generate_pus_event_tm(&mut self) {
loop {
// Perform the generation of PUS event packets
match self.pus_event_man_rx.try_recv() {
Ok(event_msg) => {
update_time(&mut self.time_provider, &mut self.timestamp);
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
param.to_vec().expect("failed to convert params to vec")
});
// We use the TM modification hook to set the sender APID for each event.
self.pus_event_tm_creator.reporter.tm_hook.next_apid =
UniqueApidTargetId::from(event_msg.sender_id()).apid;
self.pus_event_tm_creator
.generate_pus_event_tm_generic(
&self.tm_sender,
&self.timestamp,
event_msg.event(),
Some(&param_vec),
)
.expect("Sending TM as event failed");
}
Err(e) => match e {
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
log::warn!("All event senders have disconnected");
break;
}
},
}
}
}
}
pub struct EventHandler {
pub pus_event_handler: PusEventHandler,
event_manager: EventManagerWithBoundedMpsc,
}
impl EventHandler {
pub fn new(
tm_sender: mpsc::Sender<PacketAsVec>,
event_rx: mpsc::Receiver<EventMessageU32>,
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
) -> Self {
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
let pus_event_handler = PusEventHandler::new(
tm_sender,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
&mut event_manager,
event_request_rx,
);
Self {
pus_event_handler,
event_manager,
}
}
#[allow(dead_code)]
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
&mut self.event_manager
}
pub fn periodic_operation(&mut self) {
self.pus_event_handler.handle_event_requests();
self.try_event_routing();
self.pus_event_handler.generate_pus_event_tm();
}
pub fn try_event_routing(&mut self) {
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
self.routing_error_handler(event_msg, error)
};
// Perform the event routing.
self.event_manager.try_event_handling(error_handler);
}
pub fn routing_error_handler(&self, event_msg: &EventMessageU32, error: EventRoutingError) {
log::warn!("event routing error for event {event_msg:?}: {error:?}");
}
}
#[cfg(test)]
mod tests {
use satrs::{
events::EventU32,
pus::verification::VerificationReporterCfg,
spacepackets::{
ecss::{tm::PusTmReader, PusPacket},
CcsdsPacket,
},
tmtc::PacketAsVec,
};
use super::*;
const TEST_CREATOR_ID: UniqueApidTargetId = UniqueApidTargetId::new(1, 2);
const TEST_EVENT: EventU32 = EventU32::new(satrs::events::Severity::Info, 1, 1);
pub struct EventManagementTestbench {
pub event_tx: mpsc::SyncSender<EventMessageU32>,
pub event_manager: EventManagerWithBoundedMpsc,
pub tm_receiver: mpsc::Receiver<PacketAsVec>,
pub pus_event_handler: PusEventHandler,
}
impl EventManagementTestbench {
pub fn new() -> Self {
let (event_tx, event_rx) = mpsc::sync_channel(10);
let (_event_req_tx, event_req_rx) = mpsc::sync_channel(10);
let (tm_sender, tm_receiver) = mpsc::channel();
let verif_reporter_cfg = VerificationReporterCfg::new(0x05, 2, 2, 128).unwrap();
let verif_reporter =
VerificationReporter::new(PUS_EVENT_MANAGEMENT.id(), &verif_reporter_cfg);
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
let pus_event_handler =
PusEventHandler::new(tm_sender, verif_reporter, &mut event_manager, event_req_rx);
Self {
event_tx,
tm_receiver,
event_manager,
pus_event_handler,
}
}
}
#[test]
fn test_basic_event_generation() {
let mut testbench = EventManagementTestbench::new();
testbench
.event_tx
.send(EventMessageU32::new(
TEST_CREATOR_ID.id(),
EventU32::new(satrs::events::Severity::Info, 1, 1),
))
.expect("failed to send event");
testbench.pus_event_handler.handle_event_requests();
testbench.event_manager.try_event_handling(|_, _| {});
testbench.pus_event_handler.generate_pus_event_tm();
let tm_packet = testbench
.tm_receiver
.try_recv()
.expect("failed to receive TM packet");
assert_eq!(tm_packet.sender_id, PUS_EVENT_MANAGEMENT.id());
let tm_reader = PusTmReader::new(&tm_packet.packet, 7)
.expect("failed to create TM reader")
.0;
assert_eq!(tm_reader.apid(), TEST_CREATOR_ID.apid);
assert_eq!(tm_reader.user_data().len(), 4);
let event_read_back = EventU32::from_be_bytes(tm_reader.user_data().try_into().unwrap());
assert_eq!(event_read_back, TEST_EVENT);
}
#[test]
fn test_basic_event_disabled() {
// TODO: Add test.
}
}

View File

@ -44,7 +44,6 @@ pub struct TcpSppClientCommon {
#[allow(dead_code)] #[allow(dead_code)]
impl TcpSppClientCommon { impl TcpSppClientCommon {
pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> { pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> {
let mut dummy = 0;
if SPP_CLIENT_WIRETAPPING_RX { if SPP_CLIENT_WIRETAPPING_RX {
log::debug!( log::debug!(
"SPP TCP RX {} bytes: {:x?}", "SPP TCP RX {} bytes: {:x?}",
@ -54,11 +53,10 @@ impl TcpSppClientCommon {
} }
// This parser is able to deal with broken tail packets, but we ignore those for now.. // This parser is able to deal with broken tail packets, but we ignore those for now..
parse_buffer_for_ccsds_space_packets( parse_buffer_for_ccsds_space_packets(
&mut self.read_buf[..read_bytes], &self.read_buf[..read_bytes],
&self.validator, &self.validator,
self.id, self.id,
&self.tc_source_tx, &self.tc_source_tx,
&mut dummy,
)?; )?;
Ok(()) Ok(())
} }

View File

@ -32,3 +32,12 @@ impl Default for TimeStampHelper {
} }
} }
} }
pub fn update_time(time_provider: &mut CdsTime, timestamp: &mut [u8]) {
time_provider
.update_from_now()
.expect("Could not get current time");
time_provider
.write_to_bytes(timestamp)
.expect("Writing timestamp failed");
}

View File

@ -6,25 +6,31 @@ use std::{
}; };
use log::info; use log::info;
use ops_sat_rs::config::components::CAMERA_HANDLER;
use ops_sat_rs::config::{ use ops_sat_rs::config::{
cfg_file::create_app_config, cfg_file::create_app_config,
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER}, components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
pool::create_sched_tc_pool, pool::create_sched_tc_pool,
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY}, tasks::{FREQ_MS_CAMERA_HANDLING, FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
VALID_PACKET_ID_LIST, VALID_PACKET_ID_LIST,
}; };
use ops_sat_rs::config::{components::CAMERA_HANDLER, tasks::FREQ_MS_EVENT_HANDLING};
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT}; use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
use ops_sat_rs::TimeStampHelper; use ops_sat_rs::TimeStampHelper;
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer}; use satrs::{
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
use crate::handlers::camera::IMS100BatchHandler; pus::event_man::EventRequestWithToken,
use crate::pus::{
hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service,
PusTcDistributor, PusTcMpscRouter,
}; };
use crate::tmtc::tm_sink::TmFunnelDynamic; use crate::tmtc::tm_sink::TmFunnelDynamic;
use crate::{controller::ExperimentController, pus::test::create_test_service}; use crate::{controller::ExperimentController, pus::test::create_test_service};
use crate::{
events::EventHandler,
pus::{
hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service,
PusTcDistributor, PusTcMpscRouter,
},
};
use crate::{handlers::camera::IMS100BatchHandler, pus::event::create_event_service};
use crate::{ use crate::{
interface::tcp_server::{SyncTcpTmSource, TcpTask}, interface::tcp_server::{SyncTcpTmSource, TcpTask},
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer}, interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
@ -37,6 +43,7 @@ use crate::{
}; };
mod controller; mod controller;
mod events;
mod handlers; mod handlers;
mod interface; mod interface;
mod logger; mod logger;
@ -59,12 +66,21 @@ fn main() {
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel(); let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
let (pus_test_tx, pus_test_rx) = mpsc::channel(); let (pus_test_tx, pus_test_rx) = mpsc::channel();
// let (pus_event_tx, pus_event_rx) = mpsc::channel(); let (pus_event_tx, pus_event_rx) = mpsc::channel();
let (pus_sched_tx, pus_sched_rx) = mpsc::channel(); let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
let (pus_hk_tx, pus_hk_rx) = mpsc::channel(); let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
let (pus_action_tx, pus_action_rx) = mpsc::channel(); let (pus_action_tx, pus_action_rx) = mpsc::channel();
let (pus_mode_tx, pus_mode_rx) = mpsc::channel(); let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
// Create event handling components
// These sender handles are used to send event requests, for example to enable or disable
// certain events.
let (event_tx, event_rx) = mpsc::sync_channel(100);
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
// The event task is the core handler to perform the event routing and TM handling as specified
// in the sat-rs documentation.
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel(); let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel(); let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel(); let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
@ -83,27 +99,22 @@ fn main() {
let pus_router = PusTcMpscRouter { let pus_router = PusTcMpscRouter {
test_tc_sender: pus_test_tx, test_tc_sender: pus_test_tx,
// event_tc_sender: pus_event_tx, event_tc_sender: pus_event_tx,
sched_tc_sender: pus_sched_tx, sched_tc_sender: pus_sched_tx,
hk_tc_sender: pus_hk_tx, hk_tc_sender: pus_hk_tx,
action_tc_sender: pus_action_tx, action_tc_sender: pus_action_tx,
mode_tc_sender: pus_mode_tx, mode_tc_sender: pus_mode_tx,
}; };
let pus_test_service = create_test_service( let pus_test_service = create_test_service(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
tm_funnel_tx.clone(),
// event_handler.clone_event_sender(),
pus_test_rx,
);
let pus_scheduler_service = create_scheduler_service( let pus_scheduler_service = create_scheduler_service(
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
tc_source_tx.clone(), tc_source_tx.clone(),
pus_sched_rx, pus_sched_rx,
create_sched_tc_pool(), create_sched_tc_pool(),
); );
// let pus_event_service =
// let pus_event_service = create_event_service(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
let pus_action_service = create_action_service( let pus_action_service = create_action_service(
tm_funnel_tx.clone(), tm_funnel_tx.clone(),
pus_action_rx, pus_action_rx,
@ -125,7 +136,7 @@ fn main() {
let mut pus_stack = PusStack::new( let mut pus_stack = PusStack::new(
pus_test_service, pus_test_service,
pus_hk_service, pus_hk_service,
// pus_event_service, pus_event_service,
pus_action_service, pus_action_service,
pus_scheduler_service, pus_scheduler_service,
pus_mode_service, pus_mode_service,
@ -163,7 +174,7 @@ fn main() {
) )
.expect("tcp server creation failed"); .expect("tcp server creation failed");
let mut tm_funnel = TmFunnelDynamic::new( let mut tm_sink = TmFunnelDynamic::new(
sync_tm_tcp_source, sync_tm_tcp_source,
tm_funnel_rx, tm_funnel_rx,
tm_tcp_server_tx, tm_tcp_server_tx,
@ -270,17 +281,26 @@ fn main() {
// TM Funnel Task // TM Funnel Task
info!("Starting TM funnel task"); info!("Starting TM funnel task");
let funnel_stop_signal = stop_signal.clone(); let tm_sink_stop_signal = stop_signal.clone();
let jh_tm_funnel = thread::Builder::new() let jh_tm_funnel = thread::Builder::new()
.name("ops-sat tm-funnel".to_string()) .name("ops-sat tm-sink".to_string())
.spawn(move || loop { .spawn(move || loop {
tm_funnel.operation(); tm_sink.operation();
if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { if tm_sink_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break; break;
} }
}) })
.unwrap(); .unwrap();
info!("Starting event handling task");
let jh_event_handling = thread::Builder::new()
.name("sat-rs events".to_string())
.spawn(move || loop {
event_handler.periodic_operation();
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
})
.unwrap();
// PUS Handler Task // PUS Handler Task
info!("Starting PUS handlers task"); info!("Starting PUS handlers task");
let pus_stop_signal = stop_signal.clone(); let pus_stop_signal = stop_signal.clone();
@ -305,6 +325,7 @@ fn main() {
if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) { if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
break; break;
} }
thread::sleep(Duration::from_millis(FREQ_MS_CAMERA_HANDLING));
}) })
.unwrap(); .unwrap();
@ -327,6 +348,9 @@ fn main() {
jh_pus_handler jh_pus_handler
.join() .join()
.expect("Joining PUS handlers thread failed"); .expect("Joining PUS handlers thread failed");
jh_event_handling
.join()
.expect("Joining PUS handlers thread failed");
jh_camera_handler jh_camera_handler
.join() .join()
.expect("Joining camera handler thread failed"); .expect("Joining camera handler thread failed");

66
src/pus/event.rs Normal file
View File

@ -0,0 +1,66 @@
use std::sync::mpsc;
use super::HandlingStatus;
use crate::pus::create_verification_reporter;
use log::{error, warn};
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
use satrs::pus::event_man::EventRequestWithToken;
use satrs::pus::event_srv::PusEventServiceHandler;
use satrs::pus::verification::VerificationReporter;
use satrs::pus::{
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
};
use satrs::tmtc::PacketAsVec;
pub fn create_event_service(
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
event_request_tx: mpsc::Sender<EventRequestWithToken>,
) -> EventServiceWrapper {
let pus_5_handler = PusEventServiceHandler::new(
PusServiceHelper::new(
PUS_EVENT_MANAGEMENT.id(),
pus_event_rx,
tm_funnel_tx,
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
EcssTcInVecConverter::default(),
),
event_request_tx,
);
EventServiceWrapper {
handler: pus_5_handler,
}
}
pub struct EventServiceWrapper {
pub handler: PusEventServiceHandler<
MpscTcReceiver,
mpsc::Sender<PacketAsVec>,
EcssTcInVecConverter,
VerificationReporter,
>,
}
impl EventServiceWrapper {
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
match self.handler.poll_and_handle_next_tc(time_stamp) {
Ok(result) => match result {
PusPacketHandlerResult::RequestHandled => {}
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
warn!("PUS 5 partial packet handling success: {e:?}")
}
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
warn!("PUS 5 invalid subservice {invalid}");
}
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
warn!("PUS 5 subservice {subservice} not implemented");
}
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
},
Err(error) => {
error!("PUS packet handling error: {error:?}")
}
}
HandlingStatus::HandledOne
}
}

View File

@ -1,4 +1,5 @@
pub mod action; pub mod action;
pub mod event;
pub mod hk; pub mod hk;
pub mod mode; pub mod mode;
pub mod scheduler; pub mod scheduler;
@ -46,7 +47,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi
/// Simple router structure which forwards PUS telecommands to dedicated handlers. /// Simple router structure which forwards PUS telecommands to dedicated handlers.
pub struct PusTcMpscRouter { pub struct PusTcMpscRouter {
pub test_tc_sender: Sender<EcssTcAndToken>, pub test_tc_sender: Sender<EcssTcAndToken>,
// pub event_tc_sender: Sender<EcssTcAndToken>, pub event_tc_sender: Sender<EcssTcAndToken>,
pub sched_tc_sender: Sender<EcssTcAndToken>, pub sched_tc_sender: Sender<EcssTcAndToken>,
pub hk_tc_sender: Sender<EcssTcAndToken>, pub hk_tc_sender: Sender<EcssTcAndToken>,
pub action_tc_sender: Sender<EcssTcAndToken>, pub action_tc_sender: Sender<EcssTcAndToken>,
@ -109,16 +110,16 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
tc_in_memory, tc_in_memory,
token: Some(accepted_token.into()), token: Some(accepted_token.into()),
})?, })?,
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken { PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
// tc_in_memory, tc_in_memory,
// token: Some(accepted_token.into()), token: Some(accepted_token.into()),
// })?, })?,
// PusServiceId::Scheduling => { PusServiceId::Scheduling => {
// self.pus_router.sched_tc_sender.send(EcssTcAndToken { self.pus_router.sched_tc_sender.send(EcssTcAndToken {
// tc_in_memory, tc_in_memory,
// token: Some(accepted_token.into()), token: Some(accepted_token.into()),
// })? })?
// } }
_ => { _ => {
let result = self.verif_reporter.start_failure( let result = self.verif_reporter.start_failure(
&self.tm_sender, &self.tm_sender,
@ -138,10 +139,10 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) { if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
match custom_service { match custom_service {
CustomPusServiceId::Mode => { CustomPusServiceId::Mode => {
// self.pus_router.mode_tc_sender.send(EcssTcAndToken { self.pus_router.mode_tc_sender.send(EcssTcAndToken {
// tc_in_memory, tc_in_memory,
// token: Some(accepted_token.into()), token: Some(accepted_token.into()),
// })? })?
} }
CustomPusServiceId::Health => {} CustomPusServiceId::Health => {}
} }

View File

@ -4,15 +4,15 @@ use derive_new::new;
use satrs::spacepackets::time::{cds, TimeWriter}; use satrs::spacepackets::time::{cds, TimeWriter};
use super::{ use super::{
action::ActionServiceWrapper, hk::HkServiceWrapper, mode::ModeServiceWrapper, action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
scheduler::SchedulingService, TargetedPusService, mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService,
}; };
#[derive(new)] #[derive(new)]
pub struct PusStack { pub struct PusStack {
test_srv: TestCustomServiceWrapper, test_srv: TestCustomServiceWrapper,
hk_srv_wrapper: HkServiceWrapper, hk_srv_wrapper: HkServiceWrapper,
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>, event_srv: EventServiceWrapper,
action_srv_wrapper: ActionServiceWrapper, action_srv_wrapper: ActionServiceWrapper,
schedule_srv: SchedulingService, schedule_srv: SchedulingService,
mode_srv: ModeServiceWrapper, mode_srv: ModeServiceWrapper,
@ -50,7 +50,7 @@ impl PusStack {
self.schedule_srv.poll_and_handle_next_tc(&time_stamp), self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
None, None,
); );
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None); is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
is_srv_finished( is_srv_finished(
8, 8,
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp), self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),

View File

@ -1,8 +1,8 @@
use crate::pus::create_verification_reporter; use crate::pus::create_verification_reporter;
use log::{info, warn}; use log::{info, warn};
use ops_sat_rs::config::components::PUS_TEST_SERVICE; use ops_sat_rs::config::components::PUS_TEST_SERVICE;
use ops_sat_rs::config::tmtc_err; use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
// use satrs::event_man::{EventMessage, EventMessageU32}; use satrs::event_man::{EventMessage, EventMessageU32};
use satrs::pus::test::PusService17TestHandler; use satrs::pus::test::PusService17TestHandler;
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider}; use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
use satrs::pus::{ use satrs::pus::{
@ -20,6 +20,7 @@ use super::HandlingStatus;
pub fn create_test_service( pub fn create_test_service(
tm_funnel_tx: mpsc::Sender<PacketAsVec>, tm_funnel_tx: mpsc::Sender<PacketAsVec>,
event_tx: mpsc::SyncSender<EventMessageU32>,
pus_test_rx: mpsc::Receiver<EcssTcAndToken>, pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
) -> TestCustomServiceWrapper { ) -> TestCustomServiceWrapper {
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new( let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
@ -31,7 +32,7 @@ pub fn create_test_service(
)); ));
TestCustomServiceWrapper { TestCustomServiceWrapper {
handler: pus17_handler, handler: pus17_handler,
// test_srv_event_sender: event_sender, event_tx,
} }
} }
@ -42,6 +43,7 @@ pub struct TestCustomServiceWrapper {
EcssTcInVecConverter, EcssTcInVecConverter,
VerificationReporter, VerificationReporter,
>, >,
pub event_tx: mpsc::SyncSender<EventMessageU32>,
} }
impl TestCustomServiceWrapper { impl TestCustomServiceWrapper {
@ -79,9 +81,9 @@ impl TestCustomServiceWrapper {
time_stamper.write_to_bytes(&mut stamp_buf).unwrap(); time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
if subservice == 128 { if subservice == 128 {
info!("Generating test event"); info!("Generating test event");
// self.test_srv_event_sender self.event_tx
// .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into())) .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
// .expect("Sending test event failed"); .expect("Sending test event failed");
let start_token = self let start_token = self
.handler .handler
.service_helper .service_helper