add event management, small fix for CAM handler loop #17
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -733,9 +733,9 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
|
||||
|
||||
[[package]]
|
||||
name = "satrs"
|
||||
version = "0.2.0-rc.3"
|
||||
version = "0.2.0-rc.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6aa9241e4d6cb0cc395927cfe653d8bc4a9cb6b2c27f28fec713d5e6ceb0ba23"
|
||||
checksum = "2adc1d9369e3f7e21dabb3181e36c914d1a3f68f4900207a2baa129c2fd5baba"
|
||||
dependencies = [
|
||||
"bus",
|
||||
"cobs",
|
||||
@ -783,9 +783,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "satrs-shared"
|
||||
version = "0.1.3"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f01df804b902334a23c539db5e37f11bf41ecb86596292e7cc091628bf2c4f67"
|
||||
checksum = "6042477018c2d43fffccaaa5099bc299a58485139b4d31c5b276889311e474f1"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"spacepackets",
|
||||
|
@ -23,7 +23,7 @@ homedir = "0.2"
|
||||
socket2 = "0.5"
|
||||
|
||||
[dependencies.satrs]
|
||||
version = "0.2.0-rc.3"
|
||||
version = "0.2.0-rc.5"
|
||||
# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
|
||||
# branch = "main"
|
||||
features = ["test_util"]
|
||||
|
@ -1,5 +1,6 @@
|
||||
use lazy_static::lazy_static;
|
||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||
use satrs::events::{EventU32TypedSev, SeverityInfo};
|
||||
use satrs::spacepackets::PacketId;
|
||||
use satrs_mib::res_code::ResultU16Info;
|
||||
use satrs_mib::resultcode;
|
||||
@ -39,6 +40,9 @@ pub enum GroupId {
|
||||
Action = 3,
|
||||
}
|
||||
|
||||
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> =
|
||||
EventU32TypedSev::<SeverityInfo>::new(GroupId::Tmtc as u16, 0);
|
||||
|
||||
lazy_static! {
|
||||
pub static ref HOME_PATH: PathBuf = {
|
||||
let mut home_path = PathBuf::new();
|
||||
@ -280,6 +284,7 @@ pub mod tasks {
|
||||
pub const FREQ_MS_AOCS: u64 = 500;
|
||||
pub const FREQ_MS_PUS_STACK: u64 = 200;
|
||||
pub const FREQ_MS_CTRL: u64 = 400;
|
||||
pub const FREQ_MS_CAMERA_HANDLING: u64 = 400;
|
||||
|
||||
pub const STOP_CHECK_FREQUENCY_MS: u64 = 400;
|
||||
pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS);
|
||||
|
287
src/events.rs
Normal file
287
src/events.rs
Normal file
@ -0,0 +1,287 @@
|
||||
use std::sync::mpsc::{self};
|
||||
|
||||
use crate::pus::create_verification_reporter;
|
||||
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
|
||||
use satrs::event_man::{EventMessageU32, EventRoutingError};
|
||||
use satrs::params::WritableToBeBytes;
|
||||
use satrs::pus::event::EventTmHookProvider;
|
||||
use satrs::pus::verification::VerificationReporter;
|
||||
use satrs::request::UniqueApidTargetId;
|
||||
use satrs::tmtc::PacketAsVec;
|
||||
use satrs::{
|
||||
event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded},
|
||||
pus::{
|
||||
event_man::{
|
||||
DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken,
|
||||
},
|
||||
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
|
||||
},
|
||||
spacepackets::time::cds::CdsTime,
|
||||
};
|
||||
|
||||
use ops_sat_rs::update_time;
|
||||
|
||||
// This helper sets the APID of the event sender for the PUS telemetry.
|
||||
#[derive(Default)]
|
||||
pub struct EventApidSetter {
|
||||
pub next_apid: u16,
|
||||
}
|
||||
|
||||
impl EventTmHookProvider for EventApidSetter {
|
||||
fn modify_tm(&self, tm: &mut satrs::spacepackets::ecss::tm::PusTmCreator) {
|
||||
tm.set_apid(self.next_apid);
|
||||
}
|
||||
}
|
||||
|
||||
/// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event
|
||||
/// packets. It also handles the verification completion of PUS event service requests.
|
||||
pub struct PusEventHandler {
|
||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||
pus_event_tm_creator: DefaultPusEventU32TmCreator<EventApidSetter>,
|
||||
pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
|
||||
tm_sender: mpsc::Sender<PacketAsVec>,
|
||||
time_provider: CdsTime,
|
||||
timestamp: [u8; 7],
|
||||
verif_handler: VerificationReporter,
|
||||
}
|
||||
|
||||
impl PusEventHandler {
|
||||
pub fn new(
|
||||
tm_sender: mpsc::Sender<PacketAsVec>,
|
||||
verif_handler: VerificationReporter,
|
||||
event_manager: &mut EventManagerWithBoundedMpsc,
|
||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||
) -> Self {
|
||||
let event_queue_cap = 30;
|
||||
let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(event_queue_cap);
|
||||
|
||||
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
|
||||
// telemetry for each event.
|
||||
let event_reporter = EventReporter::new_with_hook(
|
||||
PUS_EVENT_MANAGEMENT.raw(),
|
||||
0,
|
||||
0,
|
||||
128,
|
||||
EventApidSetter::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let pus_event_dispatcher =
|
||||
DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter);
|
||||
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
|
||||
PUS_EVENT_MANAGEMENT.raw(),
|
||||
pus_event_man_tx,
|
||||
event_queue_cap,
|
||||
);
|
||||
|
||||
event_manager.subscribe_all(pus_event_man_send_provider.target_id());
|
||||
event_manager.add_sender(pus_event_man_send_provider);
|
||||
|
||||
Self {
|
||||
event_request_rx,
|
||||
pus_event_tm_creator: pus_event_dispatcher,
|
||||
pus_event_man_rx,
|
||||
time_provider: CdsTime::new_with_u16_days(0, 0),
|
||||
timestamp: [0; 7],
|
||||
verif_handler,
|
||||
tm_sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_event_requests(&mut self) {
|
||||
let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
|
||||
let started_token: VerificationToken<TcStateStarted> = event_req
|
||||
.token
|
||||
.try_into()
|
||||
.expect("expected start verification token");
|
||||
self.verif_handler
|
||||
.completion_success(&self.tm_sender, started_token, timestamp)
|
||||
.expect("Sending completion success failed");
|
||||
};
|
||||
loop {
|
||||
// handle event requests
|
||||
match self.event_request_rx.try_recv() {
|
||||
Ok(event_req) => match event_req.request {
|
||||
EventRequest::Enable(event) => {
|
||||
self.pus_event_tm_creator
|
||||
.enable_tm_for_event(&event)
|
||||
.expect("Enabling TM failed");
|
||||
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||
report_completion(event_req, &self.timestamp);
|
||||
}
|
||||
EventRequest::Disable(event) => {
|
||||
self.pus_event_tm_creator
|
||||
.disable_tm_for_event(&event)
|
||||
.expect("Disabling TM failed");
|
||||
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||
report_completion(event_req, &self.timestamp);
|
||||
}
|
||||
},
|
||||
Err(e) => match e {
|
||||
mpsc::TryRecvError::Empty => break,
|
||||
mpsc::TryRecvError::Disconnected => {
|
||||
log::warn!("all event request senders have disconnected");
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_pus_event_tm(&mut self) {
|
||||
loop {
|
||||
// Perform the generation of PUS event packets
|
||||
match self.pus_event_man_rx.try_recv() {
|
||||
Ok(event_msg) => {
|
||||
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
|
||||
param.to_vec().expect("failed to convert params to vec")
|
||||
});
|
||||
// We use the TM modification hook to set the sender APID for each event.
|
||||
self.pus_event_tm_creator.reporter.tm_hook.next_apid =
|
||||
UniqueApidTargetId::from(event_msg.sender_id()).apid;
|
||||
self.pus_event_tm_creator
|
||||
.generate_pus_event_tm_generic(
|
||||
&self.tm_sender,
|
||||
&self.timestamp,
|
||||
event_msg.event(),
|
||||
Some(¶m_vec),
|
||||
)
|
||||
.expect("Sending TM as event failed");
|
||||
}
|
||||
Err(e) => match e {
|
||||
mpsc::TryRecvError::Empty => break,
|
||||
mpsc::TryRecvError::Disconnected => {
|
||||
log::warn!("All event senders have disconnected");
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventHandler {
|
||||
pub pus_event_handler: PusEventHandler,
|
||||
event_manager: EventManagerWithBoundedMpsc,
|
||||
}
|
||||
|
||||
impl EventHandler {
|
||||
pub fn new(
|
||||
tm_sender: mpsc::Sender<PacketAsVec>,
|
||||
event_rx: mpsc::Receiver<EventMessageU32>,
|
||||
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||
) -> Self {
|
||||
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
|
||||
let pus_event_handler = PusEventHandler::new(
|
||||
tm_sender,
|
||||
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
|
||||
&mut event_manager,
|
||||
event_request_rx,
|
||||
);
|
||||
|
||||
Self {
|
||||
pus_event_handler,
|
||||
event_manager,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
||||
&mut self.event_manager
|
||||
}
|
||||
|
||||
pub fn periodic_operation(&mut self) {
|
||||
self.pus_event_handler.handle_event_requests();
|
||||
self.try_event_routing();
|
||||
self.pus_event_handler.generate_pus_event_tm();
|
||||
}
|
||||
|
||||
pub fn try_event_routing(&mut self) {
|
||||
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
||||
self.routing_error_handler(event_msg, error)
|
||||
};
|
||||
// Perform the event routing.
|
||||
self.event_manager.try_event_handling(error_handler);
|
||||
}
|
||||
|
||||
pub fn routing_error_handler(&self, event_msg: &EventMessageU32, error: EventRoutingError) {
|
||||
log::warn!("event routing error for event {event_msg:?}: {error:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use satrs::{
|
||||
events::EventU32,
|
||||
pus::verification::VerificationReporterCfg,
|
||||
spacepackets::{
|
||||
ecss::{tm::PusTmReader, PusPacket},
|
||||
CcsdsPacket,
|
||||
},
|
||||
tmtc::PacketAsVec,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
const TEST_CREATOR_ID: UniqueApidTargetId = UniqueApidTargetId::new(1, 2);
|
||||
const TEST_EVENT: EventU32 = EventU32::new(satrs::events::Severity::Info, 1, 1);
|
||||
|
||||
pub struct EventManagementTestbench {
|
||||
pub event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||
pub event_manager: EventManagerWithBoundedMpsc,
|
||||
pub tm_receiver: mpsc::Receiver<PacketAsVec>,
|
||||
pub pus_event_handler: PusEventHandler,
|
||||
}
|
||||
|
||||
impl EventManagementTestbench {
|
||||
pub fn new() -> Self {
|
||||
let (event_tx, event_rx) = mpsc::sync_channel(10);
|
||||
let (_event_req_tx, event_req_rx) = mpsc::sync_channel(10);
|
||||
let (tm_sender, tm_receiver) = mpsc::channel();
|
||||
let verif_reporter_cfg = VerificationReporterCfg::new(0x05, 2, 2, 128).unwrap();
|
||||
let verif_reporter =
|
||||
VerificationReporter::new(PUS_EVENT_MANAGEMENT.id(), &verif_reporter_cfg);
|
||||
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
|
||||
let pus_event_handler =
|
||||
PusEventHandler::new(tm_sender, verif_reporter, &mut event_manager, event_req_rx);
|
||||
Self {
|
||||
event_tx,
|
||||
tm_receiver,
|
||||
event_manager,
|
||||
pus_event_handler,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_event_generation() {
|
||||
let mut testbench = EventManagementTestbench::new();
|
||||
testbench
|
||||
.event_tx
|
||||
.send(EventMessageU32::new(
|
||||
TEST_CREATOR_ID.id(),
|
||||
EventU32::new(satrs::events::Severity::Info, 1, 1),
|
||||
))
|
||||
.expect("failed to send event");
|
||||
testbench.pus_event_handler.handle_event_requests();
|
||||
testbench.event_manager.try_event_handling(|_, _| {});
|
||||
testbench.pus_event_handler.generate_pus_event_tm();
|
||||
let tm_packet = testbench
|
||||
.tm_receiver
|
||||
.try_recv()
|
||||
.expect("failed to receive TM packet");
|
||||
assert_eq!(tm_packet.sender_id, PUS_EVENT_MANAGEMENT.id());
|
||||
let tm_reader = PusTmReader::new(&tm_packet.packet, 7)
|
||||
.expect("failed to create TM reader")
|
||||
.0;
|
||||
assert_eq!(tm_reader.apid(), TEST_CREATOR_ID.apid);
|
||||
assert_eq!(tm_reader.user_data().len(), 4);
|
||||
let event_read_back = EventU32::from_be_bytes(tm_reader.user_data().try_into().unwrap());
|
||||
assert_eq!(event_read_back, TEST_EVENT);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_event_disabled() {
|
||||
// TODO: Add test.
|
||||
}
|
||||
}
|
@ -44,7 +44,6 @@ pub struct TcpSppClientCommon {
|
||||
#[allow(dead_code)]
|
||||
impl TcpSppClientCommon {
|
||||
pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> {
|
||||
let mut dummy = 0;
|
||||
if SPP_CLIENT_WIRETAPPING_RX {
|
||||
log::debug!(
|
||||
"SPP TCP RX {} bytes: {:x?}",
|
||||
@ -54,11 +53,10 @@ impl TcpSppClientCommon {
|
||||
}
|
||||
// This parser is able to deal with broken tail packets, but we ignore those for now..
|
||||
parse_buffer_for_ccsds_space_packets(
|
||||
&mut self.read_buf[..read_bytes],
|
||||
&self.read_buf[..read_bytes],
|
||||
&self.validator,
|
||||
self.id,
|
||||
&self.tc_source_tx,
|
||||
&mut dummy,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -32,3 +32,12 @@ impl Default for TimeStampHelper {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_time(time_provider: &mut CdsTime, timestamp: &mut [u8]) {
|
||||
time_provider
|
||||
.update_from_now()
|
||||
.expect("Could not get current time");
|
||||
time_provider
|
||||
.write_to_bytes(timestamp)
|
||||
.expect("Writing timestamp failed");
|
||||
}
|
||||
|
72
src/main.rs
72
src/main.rs
@ -6,25 +6,31 @@ use std::{
|
||||
};
|
||||
|
||||
use log::info;
|
||||
use ops_sat_rs::config::components::CAMERA_HANDLER;
|
||||
use ops_sat_rs::config::{
|
||||
cfg_file::create_app_config,
|
||||
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
|
||||
pool::create_sched_tc_pool,
|
||||
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
|
||||
tasks::{FREQ_MS_CAMERA_HANDLING, FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
|
||||
VALID_PACKET_ID_LIST,
|
||||
};
|
||||
use ops_sat_rs::config::{components::CAMERA_HANDLER, tasks::FREQ_MS_EVENT_HANDLING};
|
||||
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
|
||||
use ops_sat_rs::TimeStampHelper;
|
||||
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
|
||||
|
||||
use crate::handlers::camera::IMS100BatchHandler;
|
||||
use crate::pus::{
|
||||
hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service,
|
||||
PusTcDistributor, PusTcMpscRouter,
|
||||
use satrs::{
|
||||
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
|
||||
pus::event_man::EventRequestWithToken,
|
||||
};
|
||||
|
||||
use crate::tmtc::tm_sink::TmFunnelDynamic;
|
||||
use crate::{controller::ExperimentController, pus::test::create_test_service};
|
||||
use crate::{
|
||||
events::EventHandler,
|
||||
pus::{
|
||||
hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service,
|
||||
PusTcDistributor, PusTcMpscRouter,
|
||||
},
|
||||
};
|
||||
use crate::{handlers::camera::IMS100BatchHandler, pus::event::create_event_service};
|
||||
use crate::{
|
||||
interface::tcp_server::{SyncTcpTmSource, TcpTask},
|
||||
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
|
||||
@ -37,6 +43,7 @@ use crate::{
|
||||
};
|
||||
|
||||
mod controller;
|
||||
mod events;
|
||||
mod handlers;
|
||||
mod interface;
|
||||
mod logger;
|
||||
@ -59,12 +66,21 @@ fn main() {
|
||||
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
|
||||
|
||||
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
||||
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
||||
let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
||||
let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
|
||||
let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
|
||||
let (pus_action_tx, pus_action_rx) = mpsc::channel();
|
||||
let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
|
||||
|
||||
// Create event handling components
|
||||
// These sender handles are used to send event requests, for example to enable or disable
|
||||
// certain events.
|
||||
let (event_tx, event_rx) = mpsc::sync_channel(100);
|
||||
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
|
||||
// The event task is the core handler to perform the event routing and TM handling as specified
|
||||
// in the sat-rs documentation.
|
||||
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
|
||||
|
||||
let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
|
||||
let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
|
||||
let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
|
||||
@ -83,27 +99,22 @@ fn main() {
|
||||
|
||||
let pus_router = PusTcMpscRouter {
|
||||
test_tc_sender: pus_test_tx,
|
||||
// event_tc_sender: pus_event_tx,
|
||||
event_tc_sender: pus_event_tx,
|
||||
sched_tc_sender: pus_sched_tx,
|
||||
hk_tc_sender: pus_hk_tx,
|
||||
action_tc_sender: pus_action_tx,
|
||||
mode_tc_sender: pus_mode_tx,
|
||||
};
|
||||
|
||||
let pus_test_service = create_test_service(
|
||||
tm_funnel_tx.clone(),
|
||||
// event_handler.clone_event_sender(),
|
||||
pus_test_rx,
|
||||
);
|
||||
let pus_test_service = create_test_service(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
|
||||
let pus_scheduler_service = create_scheduler_service(
|
||||
tm_funnel_tx.clone(),
|
||||
tc_source_tx.clone(),
|
||||
pus_sched_rx,
|
||||
create_sched_tc_pool(),
|
||||
);
|
||||
//
|
||||
// let pus_event_service =
|
||||
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
|
||||
let pus_event_service =
|
||||
create_event_service(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
|
||||
let pus_action_service = create_action_service(
|
||||
tm_funnel_tx.clone(),
|
||||
pus_action_rx,
|
||||
@ -125,7 +136,7 @@ fn main() {
|
||||
let mut pus_stack = PusStack::new(
|
||||
pus_test_service,
|
||||
pus_hk_service,
|
||||
// pus_event_service,
|
||||
pus_event_service,
|
||||
pus_action_service,
|
||||
pus_scheduler_service,
|
||||
pus_mode_service,
|
||||
@ -163,7 +174,7 @@ fn main() {
|
||||
)
|
||||
.expect("tcp server creation failed");
|
||||
|
||||
let mut tm_funnel = TmFunnelDynamic::new(
|
||||
let mut tm_sink = TmFunnelDynamic::new(
|
||||
sync_tm_tcp_source,
|
||||
tm_funnel_rx,
|
||||
tm_tcp_server_tx,
|
||||
@ -270,17 +281,26 @@ fn main() {
|
||||
|
||||
// TM Funnel Task
|
||||
info!("Starting TM funnel task");
|
||||
let funnel_stop_signal = stop_signal.clone();
|
||||
let tm_sink_stop_signal = stop_signal.clone();
|
||||
let jh_tm_funnel = thread::Builder::new()
|
||||
.name("ops-sat tm-funnel".to_string())
|
||||
.name("ops-sat tm-sink".to_string())
|
||||
.spawn(move || loop {
|
||||
tm_funnel.operation();
|
||||
if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
tm_sink.operation();
|
||||
if tm_sink_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
info!("Starting event handling task");
|
||||
let jh_event_handling = thread::Builder::new()
|
||||
.name("sat-rs events".to_string())
|
||||
.spawn(move || loop {
|
||||
event_handler.periodic_operation();
|
||||
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// PUS Handler Task
|
||||
info!("Starting PUS handlers task");
|
||||
let pus_stop_signal = stop_signal.clone();
|
||||
@ -305,6 +325,7 @@ fn main() {
|
||||
if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_millis(FREQ_MS_CAMERA_HANDLING));
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@ -327,6 +348,9 @@ fn main() {
|
||||
jh_pus_handler
|
||||
.join()
|
||||
.expect("Joining PUS handlers thread failed");
|
||||
jh_event_handling
|
||||
.join()
|
||||
.expect("Joining PUS handlers thread failed");
|
||||
jh_camera_handler
|
||||
.join()
|
||||
.expect("Joining camera handler thread failed");
|
||||
|
66
src/pus/event.rs
Normal file
66
src/pus/event.rs
Normal file
@ -0,0 +1,66 @@
|
||||
use std::sync::mpsc;
|
||||
|
||||
use super::HandlingStatus;
|
||||
use crate::pus::create_verification_reporter;
|
||||
use log::{error, warn};
|
||||
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
|
||||
use satrs::pus::event_man::EventRequestWithToken;
|
||||
use satrs::pus::event_srv::PusEventServiceHandler;
|
||||
use satrs::pus::verification::VerificationReporter;
|
||||
use satrs::pus::{
|
||||
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
|
||||
};
|
||||
use satrs::tmtc::PacketAsVec;
|
||||
|
||||
pub fn create_event_service(
|
||||
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||
event_request_tx: mpsc::Sender<EventRequestWithToken>,
|
||||
) -> EventServiceWrapper {
|
||||
let pus_5_handler = PusEventServiceHandler::new(
|
||||
PusServiceHelper::new(
|
||||
PUS_EVENT_MANAGEMENT.id(),
|
||||
pus_event_rx,
|
||||
tm_funnel_tx,
|
||||
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
|
||||
EcssTcInVecConverter::default(),
|
||||
),
|
||||
event_request_tx,
|
||||
);
|
||||
EventServiceWrapper {
|
||||
handler: pus_5_handler,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventServiceWrapper {
|
||||
pub handler: PusEventServiceHandler<
|
||||
MpscTcReceiver,
|
||||
mpsc::Sender<PacketAsVec>,
|
||||
EcssTcInVecConverter,
|
||||
VerificationReporter,
|
||||
>,
|
||||
}
|
||||
|
||||
impl EventServiceWrapper {
|
||||
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||
match self.handler.poll_and_handle_next_tc(time_stamp) {
|
||||
Ok(result) => match result {
|
||||
PusPacketHandlerResult::RequestHandled => {}
|
||||
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
||||
warn!("PUS 5 partial packet handling success: {e:?}")
|
||||
}
|
||||
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
||||
warn!("PUS 5 invalid subservice {invalid}");
|
||||
}
|
||||
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||
warn!("PUS 5 subservice {subservice} not implemented");
|
||||
}
|
||||
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||
},
|
||||
Err(error) => {
|
||||
error!("PUS packet handling error: {error:?}")
|
||||
}
|
||||
}
|
||||
HandlingStatus::HandledOne
|
||||
}
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
pub mod action;
|
||||
pub mod event;
|
||||
pub mod hk;
|
||||
pub mod mode;
|
||||
pub mod scheduler;
|
||||
@ -46,7 +47,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi
|
||||
/// Simple router structure which forwards PUS telecommands to dedicated handlers.
|
||||
pub struct PusTcMpscRouter {
|
||||
pub test_tc_sender: Sender<EcssTcAndToken>,
|
||||
// pub event_tc_sender: Sender<EcssTcAndToken>,
|
||||
pub event_tc_sender: Sender<EcssTcAndToken>,
|
||||
pub sched_tc_sender: Sender<EcssTcAndToken>,
|
||||
pub hk_tc_sender: Sender<EcssTcAndToken>,
|
||||
pub action_tc_sender: Sender<EcssTcAndToken>,
|
||||
@ -109,16 +110,16 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?,
|
||||
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
|
||||
// tc_in_memory,
|
||||
// token: Some(accepted_token.into()),
|
||||
// })?,
|
||||
// PusServiceId::Scheduling => {
|
||||
// self.pus_router.sched_tc_sender.send(EcssTcAndToken {
|
||||
// tc_in_memory,
|
||||
// token: Some(accepted_token.into()),
|
||||
// })?
|
||||
// }
|
||||
PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?,
|
||||
PusServiceId::Scheduling => {
|
||||
self.pus_router.sched_tc_sender.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?
|
||||
}
|
||||
_ => {
|
||||
let result = self.verif_reporter.start_failure(
|
||||
&self.tm_sender,
|
||||
@ -138,10 +139,10 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
||||
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
|
||||
match custom_service {
|
||||
CustomPusServiceId::Mode => {
|
||||
// self.pus_router.mode_tc_sender.send(EcssTcAndToken {
|
||||
// tc_in_memory,
|
||||
// token: Some(accepted_token.into()),
|
||||
// })?
|
||||
self.pus_router.mode_tc_sender.send(EcssTcAndToken {
|
||||
tc_in_memory,
|
||||
token: Some(accepted_token.into()),
|
||||
})?
|
||||
}
|
||||
CustomPusServiceId::Health => {}
|
||||
}
|
||||
|
@ -4,15 +4,15 @@ use derive_new::new;
|
||||
use satrs::spacepackets::time::{cds, TimeWriter};
|
||||
|
||||
use super::{
|
||||
action::ActionServiceWrapper, hk::HkServiceWrapper, mode::ModeServiceWrapper,
|
||||
scheduler::SchedulingService, TargetedPusService,
|
||||
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
|
||||
mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService,
|
||||
};
|
||||
|
||||
#[derive(new)]
|
||||
pub struct PusStack {
|
||||
test_srv: TestCustomServiceWrapper,
|
||||
hk_srv_wrapper: HkServiceWrapper,
|
||||
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
|
||||
event_srv: EventServiceWrapper,
|
||||
action_srv_wrapper: ActionServiceWrapper,
|
||||
schedule_srv: SchedulingService,
|
||||
mode_srv: ModeServiceWrapper,
|
||||
@ -50,7 +50,7 @@ impl PusStack {
|
||||
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
|
||||
None,
|
||||
);
|
||||
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
||||
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
||||
is_srv_finished(
|
||||
8,
|
||||
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
||||
|
@ -1,8 +1,8 @@
|
||||
use crate::pus::create_verification_reporter;
|
||||
use log::{info, warn};
|
||||
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
|
||||
use ops_sat_rs::config::tmtc_err;
|
||||
// use satrs::event_man::{EventMessage, EventMessageU32};
|
||||
use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
|
||||
use satrs::event_man::{EventMessage, EventMessageU32};
|
||||
use satrs::pus::test::PusService17TestHandler;
|
||||
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
|
||||
use satrs::pus::{
|
||||
@ -20,6 +20,7 @@ use super::HandlingStatus;
|
||||
|
||||
pub fn create_test_service(
|
||||
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||
event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||
) -> TestCustomServiceWrapper {
|
||||
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
||||
@ -31,7 +32,7 @@ pub fn create_test_service(
|
||||
));
|
||||
TestCustomServiceWrapper {
|
||||
handler: pus17_handler,
|
||||
// test_srv_event_sender: event_sender,
|
||||
event_tx,
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,6 +43,7 @@ pub struct TestCustomServiceWrapper {
|
||||
EcssTcInVecConverter,
|
||||
VerificationReporter,
|
||||
>,
|
||||
pub event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||
}
|
||||
|
||||
impl TestCustomServiceWrapper {
|
||||
@ -79,9 +81,9 @@ impl TestCustomServiceWrapper {
|
||||
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
|
||||
if subservice == 128 {
|
||||
info!("Generating test event");
|
||||
// self.test_srv_event_sender
|
||||
// .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
|
||||
// .expect("Sending test event failed");
|
||||
self.event_tx
|
||||
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
|
||||
.expect("Sending test event failed");
|
||||
let start_token = self
|
||||
.handler
|
||||
.service_helper
|
||||
|
Loading…
Reference in New Issue
Block a user