add event management, small fix for CAM handler loop #17
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -733,9 +733,9 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs"
|
name = "satrs"
|
||||||
version = "0.2.0-rc.3"
|
version = "0.2.0-rc.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6aa9241e4d6cb0cc395927cfe653d8bc4a9cb6b2c27f28fec713d5e6ceb0ba23"
|
checksum = "2adc1d9369e3f7e21dabb3181e36c914d1a3f68f4900207a2baa129c2fd5baba"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bus",
|
"bus",
|
||||||
"cobs",
|
"cobs",
|
||||||
@ -783,9 +783,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "satrs-shared"
|
name = "satrs-shared"
|
||||||
version = "0.1.3"
|
version = "0.1.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f01df804b902334a23c539db5e37f11bf41ecb86596292e7cc091628bf2c4f67"
|
checksum = "6042477018c2d43fffccaaa5099bc299a58485139b4d31c5b276889311e474f1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"spacepackets",
|
"spacepackets",
|
||||||
|
@ -23,7 +23,7 @@ homedir = "0.2"
|
|||||||
socket2 = "0.5"
|
socket2 = "0.5"
|
||||||
|
|
||||||
[dependencies.satrs]
|
[dependencies.satrs]
|
||||||
version = "0.2.0-rc.3"
|
version = "0.2.0-rc.5"
|
||||||
# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
|
# git = "https://egit.irs.uni-stuttgart.de/rust/sat-rs.git"
|
||||||
# branch = "main"
|
# branch = "main"
|
||||||
features = ["test_util"]
|
features = ["test_util"]
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||||
|
use satrs::events::{EventU32TypedSev, SeverityInfo};
|
||||||
use satrs::spacepackets::PacketId;
|
use satrs::spacepackets::PacketId;
|
||||||
use satrs_mib::res_code::ResultU16Info;
|
use satrs_mib::res_code::ResultU16Info;
|
||||||
use satrs_mib::resultcode;
|
use satrs_mib::resultcode;
|
||||||
@ -39,6 +40,9 @@ pub enum GroupId {
|
|||||||
Action = 3,
|
Action = 3,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const TEST_EVENT: EventU32TypedSev<SeverityInfo> =
|
||||||
|
EventU32TypedSev::<SeverityInfo>::new(GroupId::Tmtc as u16, 0);
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
pub static ref HOME_PATH: PathBuf = {
|
pub static ref HOME_PATH: PathBuf = {
|
||||||
let mut home_path = PathBuf::new();
|
let mut home_path = PathBuf::new();
|
||||||
@ -280,6 +284,7 @@ pub mod tasks {
|
|||||||
pub const FREQ_MS_AOCS: u64 = 500;
|
pub const FREQ_MS_AOCS: u64 = 500;
|
||||||
pub const FREQ_MS_PUS_STACK: u64 = 200;
|
pub const FREQ_MS_PUS_STACK: u64 = 200;
|
||||||
pub const FREQ_MS_CTRL: u64 = 400;
|
pub const FREQ_MS_CTRL: u64 = 400;
|
||||||
|
pub const FREQ_MS_CAMERA_HANDLING: u64 = 400;
|
||||||
|
|
||||||
pub const STOP_CHECK_FREQUENCY_MS: u64 = 400;
|
pub const STOP_CHECK_FREQUENCY_MS: u64 = 400;
|
||||||
pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS);
|
pub const STOP_CHECK_FREQUENCY: Duration = Duration::from_millis(STOP_CHECK_FREQUENCY_MS);
|
||||||
|
287
src/events.rs
Normal file
287
src/events.rs
Normal file
@ -0,0 +1,287 @@
|
|||||||
|
use std::sync::mpsc::{self};
|
||||||
|
|
||||||
|
use crate::pus::create_verification_reporter;
|
||||||
|
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
|
||||||
|
use satrs::event_man::{EventMessageU32, EventRoutingError};
|
||||||
|
use satrs::params::WritableToBeBytes;
|
||||||
|
use satrs::pus::event::EventTmHookProvider;
|
||||||
|
use satrs::pus::verification::VerificationReporter;
|
||||||
|
use satrs::request::UniqueApidTargetId;
|
||||||
|
use satrs::tmtc::PacketAsVec;
|
||||||
|
use satrs::{
|
||||||
|
event_man::{EventManagerWithBoundedMpsc, EventSendProvider, EventU32SenderMpscBounded},
|
||||||
|
pus::{
|
||||||
|
event_man::{
|
||||||
|
DefaultPusEventU32TmCreator, EventReporter, EventRequest, EventRequestWithToken,
|
||||||
|
},
|
||||||
|
verification::{TcStateStarted, VerificationReportingProvider, VerificationToken},
|
||||||
|
},
|
||||||
|
spacepackets::time::cds::CdsTime,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ops_sat_rs::update_time;
|
||||||
|
|
||||||
|
// This helper sets the APID of the event sender for the PUS telemetry.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct EventApidSetter {
|
||||||
|
pub next_apid: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventTmHookProvider for EventApidSetter {
|
||||||
|
fn modify_tm(&self, tm: &mut satrs::spacepackets::ecss::tm::PusTmCreator) {
|
||||||
|
tm.set_apid(self.next_apid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The PUS event handler subscribes for all events and converts them into ECSS PUS 5 event
|
||||||
|
/// packets. It also handles the verification completion of PUS event service requests.
|
||||||
|
pub struct PusEventHandler {
|
||||||
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
|
pus_event_tm_creator: DefaultPusEventU32TmCreator<EventApidSetter>,
|
||||||
|
pus_event_man_rx: mpsc::Receiver<EventMessageU32>,
|
||||||
|
tm_sender: mpsc::Sender<PacketAsVec>,
|
||||||
|
time_provider: CdsTime,
|
||||||
|
timestamp: [u8; 7],
|
||||||
|
verif_handler: VerificationReporter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PusEventHandler {
|
||||||
|
pub fn new(
|
||||||
|
tm_sender: mpsc::Sender<PacketAsVec>,
|
||||||
|
verif_handler: VerificationReporter,
|
||||||
|
event_manager: &mut EventManagerWithBoundedMpsc,
|
||||||
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
|
) -> Self {
|
||||||
|
let event_queue_cap = 30;
|
||||||
|
let (pus_event_man_tx, pus_event_man_rx) = mpsc::sync_channel(event_queue_cap);
|
||||||
|
|
||||||
|
// All events sent to the manager are routed to the PUS event manager, which generates PUS event
|
||||||
|
// telemetry for each event.
|
||||||
|
let event_reporter = EventReporter::new_with_hook(
|
||||||
|
PUS_EVENT_MANAGEMENT.raw(),
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
128,
|
||||||
|
EventApidSetter::default(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let pus_event_dispatcher =
|
||||||
|
DefaultPusEventU32TmCreator::new_with_default_backend(event_reporter);
|
||||||
|
let pus_event_man_send_provider = EventU32SenderMpscBounded::new(
|
||||||
|
PUS_EVENT_MANAGEMENT.raw(),
|
||||||
|
pus_event_man_tx,
|
||||||
|
event_queue_cap,
|
||||||
|
);
|
||||||
|
|
||||||
|
event_manager.subscribe_all(pus_event_man_send_provider.target_id());
|
||||||
|
event_manager.add_sender(pus_event_man_send_provider);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
event_request_rx,
|
||||||
|
pus_event_tm_creator: pus_event_dispatcher,
|
||||||
|
pus_event_man_rx,
|
||||||
|
time_provider: CdsTime::new_with_u16_days(0, 0),
|
||||||
|
timestamp: [0; 7],
|
||||||
|
verif_handler,
|
||||||
|
tm_sender,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn handle_event_requests(&mut self) {
|
||||||
|
let report_completion = |event_req: EventRequestWithToken, timestamp: &[u8]| {
|
||||||
|
let started_token: VerificationToken<TcStateStarted> = event_req
|
||||||
|
.token
|
||||||
|
.try_into()
|
||||||
|
.expect("expected start verification token");
|
||||||
|
self.verif_handler
|
||||||
|
.completion_success(&self.tm_sender, started_token, timestamp)
|
||||||
|
.expect("Sending completion success failed");
|
||||||
|
};
|
||||||
|
loop {
|
||||||
|
// handle event requests
|
||||||
|
match self.event_request_rx.try_recv() {
|
||||||
|
Ok(event_req) => match event_req.request {
|
||||||
|
EventRequest::Enable(event) => {
|
||||||
|
self.pus_event_tm_creator
|
||||||
|
.enable_tm_for_event(&event)
|
||||||
|
.expect("Enabling TM failed");
|
||||||
|
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||||
|
report_completion(event_req, &self.timestamp);
|
||||||
|
}
|
||||||
|
EventRequest::Disable(event) => {
|
||||||
|
self.pus_event_tm_creator
|
||||||
|
.disable_tm_for_event(&event)
|
||||||
|
.expect("Disabling TM failed");
|
||||||
|
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||||
|
report_completion(event_req, &self.timestamp);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => match e {
|
||||||
|
mpsc::TryRecvError::Empty => break,
|
||||||
|
mpsc::TryRecvError::Disconnected => {
|
||||||
|
log::warn!("all event request senders have disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn generate_pus_event_tm(&mut self) {
|
||||||
|
loop {
|
||||||
|
// Perform the generation of PUS event packets
|
||||||
|
match self.pus_event_man_rx.try_recv() {
|
||||||
|
Ok(event_msg) => {
|
||||||
|
update_time(&mut self.time_provider, &mut self.timestamp);
|
||||||
|
let param_vec = event_msg.params().map_or(Vec::new(), |param| {
|
||||||
|
param.to_vec().expect("failed to convert params to vec")
|
||||||
|
});
|
||||||
|
// We use the TM modification hook to set the sender APID for each event.
|
||||||
|
self.pus_event_tm_creator.reporter.tm_hook.next_apid =
|
||||||
|
UniqueApidTargetId::from(event_msg.sender_id()).apid;
|
||||||
|
self.pus_event_tm_creator
|
||||||
|
.generate_pus_event_tm_generic(
|
||||||
|
&self.tm_sender,
|
||||||
|
&self.timestamp,
|
||||||
|
event_msg.event(),
|
||||||
|
Some(¶m_vec),
|
||||||
|
)
|
||||||
|
.expect("Sending TM as event failed");
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
mpsc::TryRecvError::Empty => break,
|
||||||
|
mpsc::TryRecvError::Disconnected => {
|
||||||
|
log::warn!("All event senders have disconnected");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EventHandler {
|
||||||
|
pub pus_event_handler: PusEventHandler,
|
||||||
|
event_manager: EventManagerWithBoundedMpsc,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventHandler {
|
||||||
|
pub fn new(
|
||||||
|
tm_sender: mpsc::Sender<PacketAsVec>,
|
||||||
|
event_rx: mpsc::Receiver<EventMessageU32>,
|
||||||
|
event_request_rx: mpsc::Receiver<EventRequestWithToken>,
|
||||||
|
) -> Self {
|
||||||
|
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
|
||||||
|
let pus_event_handler = PusEventHandler::new(
|
||||||
|
tm_sender,
|
||||||
|
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
|
||||||
|
&mut event_manager,
|
||||||
|
event_request_rx,
|
||||||
|
);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
pus_event_handler,
|
||||||
|
event_manager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn event_manager(&mut self) -> &mut EventManagerWithBoundedMpsc {
|
||||||
|
&mut self.event_manager
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn periodic_operation(&mut self) {
|
||||||
|
self.pus_event_handler.handle_event_requests();
|
||||||
|
self.try_event_routing();
|
||||||
|
self.pus_event_handler.generate_pus_event_tm();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn try_event_routing(&mut self) {
|
||||||
|
let error_handler = |event_msg: &EventMessageU32, error: EventRoutingError| {
|
||||||
|
self.routing_error_handler(event_msg, error)
|
||||||
|
};
|
||||||
|
// Perform the event routing.
|
||||||
|
self.event_manager.try_event_handling(error_handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn routing_error_handler(&self, event_msg: &EventMessageU32, error: EventRoutingError) {
|
||||||
|
log::warn!("event routing error for event {event_msg:?}: {error:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use satrs::{
|
||||||
|
events::EventU32,
|
||||||
|
pus::verification::VerificationReporterCfg,
|
||||||
|
spacepackets::{
|
||||||
|
ecss::{tm::PusTmReader, PusPacket},
|
||||||
|
CcsdsPacket,
|
||||||
|
},
|
||||||
|
tmtc::PacketAsVec,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
const TEST_CREATOR_ID: UniqueApidTargetId = UniqueApidTargetId::new(1, 2);
|
||||||
|
const TEST_EVENT: EventU32 = EventU32::new(satrs::events::Severity::Info, 1, 1);
|
||||||
|
|
||||||
|
pub struct EventManagementTestbench {
|
||||||
|
pub event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||||
|
pub event_manager: EventManagerWithBoundedMpsc,
|
||||||
|
pub tm_receiver: mpsc::Receiver<PacketAsVec>,
|
||||||
|
pub pus_event_handler: PusEventHandler,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventManagementTestbench {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let (event_tx, event_rx) = mpsc::sync_channel(10);
|
||||||
|
let (_event_req_tx, event_req_rx) = mpsc::sync_channel(10);
|
||||||
|
let (tm_sender, tm_receiver) = mpsc::channel();
|
||||||
|
let verif_reporter_cfg = VerificationReporterCfg::new(0x05, 2, 2, 128).unwrap();
|
||||||
|
let verif_reporter =
|
||||||
|
VerificationReporter::new(PUS_EVENT_MANAGEMENT.id(), &verif_reporter_cfg);
|
||||||
|
let mut event_manager = EventManagerWithBoundedMpsc::new(event_rx);
|
||||||
|
let pus_event_handler =
|
||||||
|
PusEventHandler::new(tm_sender, verif_reporter, &mut event_manager, event_req_rx);
|
||||||
|
Self {
|
||||||
|
event_tx,
|
||||||
|
tm_receiver,
|
||||||
|
event_manager,
|
||||||
|
pus_event_handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_basic_event_generation() {
|
||||||
|
let mut testbench = EventManagementTestbench::new();
|
||||||
|
testbench
|
||||||
|
.event_tx
|
||||||
|
.send(EventMessageU32::new(
|
||||||
|
TEST_CREATOR_ID.id(),
|
||||||
|
EventU32::new(satrs::events::Severity::Info, 1, 1),
|
||||||
|
))
|
||||||
|
.expect("failed to send event");
|
||||||
|
testbench.pus_event_handler.handle_event_requests();
|
||||||
|
testbench.event_manager.try_event_handling(|_, _| {});
|
||||||
|
testbench.pus_event_handler.generate_pus_event_tm();
|
||||||
|
let tm_packet = testbench
|
||||||
|
.tm_receiver
|
||||||
|
.try_recv()
|
||||||
|
.expect("failed to receive TM packet");
|
||||||
|
assert_eq!(tm_packet.sender_id, PUS_EVENT_MANAGEMENT.id());
|
||||||
|
let tm_reader = PusTmReader::new(&tm_packet.packet, 7)
|
||||||
|
.expect("failed to create TM reader")
|
||||||
|
.0;
|
||||||
|
assert_eq!(tm_reader.apid(), TEST_CREATOR_ID.apid);
|
||||||
|
assert_eq!(tm_reader.user_data().len(), 4);
|
||||||
|
let event_read_back = EventU32::from_be_bytes(tm_reader.user_data().try_into().unwrap());
|
||||||
|
assert_eq!(event_read_back, TEST_EVENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_basic_event_disabled() {
|
||||||
|
// TODO: Add test.
|
||||||
|
}
|
||||||
|
}
|
@ -44,7 +44,6 @@ pub struct TcpSppClientCommon {
|
|||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
impl TcpSppClientCommon {
|
impl TcpSppClientCommon {
|
||||||
pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> {
|
pub fn handle_read_bytstream(&mut self, read_bytes: usize) -> Result<(), ClientError> {
|
||||||
let mut dummy = 0;
|
|
||||||
if SPP_CLIENT_WIRETAPPING_RX {
|
if SPP_CLIENT_WIRETAPPING_RX {
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"SPP TCP RX {} bytes: {:x?}",
|
"SPP TCP RX {} bytes: {:x?}",
|
||||||
@ -54,11 +53,10 @@ impl TcpSppClientCommon {
|
|||||||
}
|
}
|
||||||
// This parser is able to deal with broken tail packets, but we ignore those for now..
|
// This parser is able to deal with broken tail packets, but we ignore those for now..
|
||||||
parse_buffer_for_ccsds_space_packets(
|
parse_buffer_for_ccsds_space_packets(
|
||||||
&mut self.read_buf[..read_bytes],
|
&self.read_buf[..read_bytes],
|
||||||
&self.validator,
|
&self.validator,
|
||||||
self.id,
|
self.id,
|
||||||
&self.tc_source_tx,
|
&self.tc_source_tx,
|
||||||
&mut dummy,
|
|
||||||
)?;
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -32,3 +32,12 @@ impl Default for TimeStampHelper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn update_time(time_provider: &mut CdsTime, timestamp: &mut [u8]) {
|
||||||
|
time_provider
|
||||||
|
.update_from_now()
|
||||||
|
.expect("Could not get current time");
|
||||||
|
time_provider
|
||||||
|
.write_to_bytes(timestamp)
|
||||||
|
.expect("Writing timestamp failed");
|
||||||
|
}
|
||||||
|
72
src/main.rs
72
src/main.rs
@ -6,25 +6,31 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use log::info;
|
use log::info;
|
||||||
use ops_sat_rs::config::components::CAMERA_HANDLER;
|
|
||||||
use ops_sat_rs::config::{
|
use ops_sat_rs::config::{
|
||||||
cfg_file::create_app_config,
|
cfg_file::create_app_config,
|
||||||
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
|
components::{CONTROLLER_ID, TCP_SERVER, TCP_SPP_CLIENT, UDP_SERVER},
|
||||||
pool::create_sched_tc_pool,
|
pool::create_sched_tc_pool,
|
||||||
tasks::{FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
|
tasks::{FREQ_MS_CAMERA_HANDLING, FREQ_MS_CTRL, FREQ_MS_PUS_STACK, STOP_CHECK_FREQUENCY},
|
||||||
VALID_PACKET_ID_LIST,
|
VALID_PACKET_ID_LIST,
|
||||||
};
|
};
|
||||||
|
use ops_sat_rs::config::{components::CAMERA_HANDLER, tasks::FREQ_MS_EVENT_HANDLING};
|
||||||
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
|
use ops_sat_rs::config::{tasks::FREQ_MS_UDP_TMTC, OBSW_SERVER_ADDR, SERVER_PORT};
|
||||||
use ops_sat_rs::TimeStampHelper;
|
use ops_sat_rs::TimeStampHelper;
|
||||||
use satrs::hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer};
|
use satrs::{
|
||||||
|
hal::std::{tcp_server::ServerConfig, udp_server::UdpTcServer},
|
||||||
use crate::handlers::camera::IMS100BatchHandler;
|
pus::event_man::EventRequestWithToken,
|
||||||
use crate::pus::{
|
|
||||||
hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service,
|
|
||||||
PusTcDistributor, PusTcMpscRouter,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::tmtc::tm_sink::TmFunnelDynamic;
|
use crate::tmtc::tm_sink::TmFunnelDynamic;
|
||||||
use crate::{controller::ExperimentController, pus::test::create_test_service};
|
use crate::{controller::ExperimentController, pus::test::create_test_service};
|
||||||
|
use crate::{
|
||||||
|
events::EventHandler,
|
||||||
|
pus::{
|
||||||
|
hk::create_hk_service, mode::create_mode_service, scheduler::create_scheduler_service,
|
||||||
|
PusTcDistributor, PusTcMpscRouter,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use crate::{handlers::camera::IMS100BatchHandler, pus::event::create_event_service};
|
||||||
use crate::{
|
use crate::{
|
||||||
interface::tcp_server::{SyncTcpTmSource, TcpTask},
|
interface::tcp_server::{SyncTcpTmSource, TcpTask},
|
||||||
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
|
interface::udp_server::{DynamicUdpTmHandler, UdpTmtcServer},
|
||||||
@ -37,6 +43,7 @@ use crate::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
mod controller;
|
mod controller;
|
||||||
|
mod events;
|
||||||
mod handlers;
|
mod handlers;
|
||||||
mod interface;
|
mod interface;
|
||||||
mod logger;
|
mod logger;
|
||||||
@ -59,12 +66,21 @@ fn main() {
|
|||||||
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
|
let (tm_tcp_client_tx, tm_tcp_client_rx) = mpsc::channel();
|
||||||
|
|
||||||
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
let (pus_test_tx, pus_test_rx) = mpsc::channel();
|
||||||
// let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
let (pus_event_tx, pus_event_rx) = mpsc::channel();
|
||||||
let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
|
let (pus_sched_tx, pus_sched_rx) = mpsc::channel();
|
||||||
let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
|
let (pus_hk_tx, pus_hk_rx) = mpsc::channel();
|
||||||
let (pus_action_tx, pus_action_rx) = mpsc::channel();
|
let (pus_action_tx, pus_action_rx) = mpsc::channel();
|
||||||
let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
|
let (pus_mode_tx, pus_mode_rx) = mpsc::channel();
|
||||||
|
|
||||||
|
// Create event handling components
|
||||||
|
// These sender handles are used to send event requests, for example to enable or disable
|
||||||
|
// certain events.
|
||||||
|
let (event_tx, event_rx) = mpsc::sync_channel(100);
|
||||||
|
let (event_request_tx, event_request_rx) = mpsc::channel::<EventRequestWithToken>();
|
||||||
|
// The event task is the core handler to perform the event routing and TM handling as specified
|
||||||
|
// in the sat-rs documentation.
|
||||||
|
let mut event_handler = EventHandler::new(tm_funnel_tx.clone(), event_rx, event_request_rx);
|
||||||
|
|
||||||
let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
|
let (pus_action_reply_tx, pus_action_reply_rx) = mpsc::channel();
|
||||||
let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
|
let (_pus_hk_reply_tx, pus_hk_reply_rx) = mpsc::channel();
|
||||||
let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
|
let (_pus_mode_reply_tx, pus_mode_reply_rx) = mpsc::channel();
|
||||||
@ -83,27 +99,22 @@ fn main() {
|
|||||||
|
|
||||||
let pus_router = PusTcMpscRouter {
|
let pus_router = PusTcMpscRouter {
|
||||||
test_tc_sender: pus_test_tx,
|
test_tc_sender: pus_test_tx,
|
||||||
// event_tc_sender: pus_event_tx,
|
event_tc_sender: pus_event_tx,
|
||||||
sched_tc_sender: pus_sched_tx,
|
sched_tc_sender: pus_sched_tx,
|
||||||
hk_tc_sender: pus_hk_tx,
|
hk_tc_sender: pus_hk_tx,
|
||||||
action_tc_sender: pus_action_tx,
|
action_tc_sender: pus_action_tx,
|
||||||
mode_tc_sender: pus_mode_tx,
|
mode_tc_sender: pus_mode_tx,
|
||||||
};
|
};
|
||||||
|
|
||||||
let pus_test_service = create_test_service(
|
let pus_test_service = create_test_service(tm_funnel_tx.clone(), event_tx.clone(), pus_test_rx);
|
||||||
tm_funnel_tx.clone(),
|
|
||||||
// event_handler.clone_event_sender(),
|
|
||||||
pus_test_rx,
|
|
||||||
);
|
|
||||||
let pus_scheduler_service = create_scheduler_service(
|
let pus_scheduler_service = create_scheduler_service(
|
||||||
tm_funnel_tx.clone(),
|
tm_funnel_tx.clone(),
|
||||||
tc_source_tx.clone(),
|
tc_source_tx.clone(),
|
||||||
pus_sched_rx,
|
pus_sched_rx,
|
||||||
create_sched_tc_pool(),
|
create_sched_tc_pool(),
|
||||||
);
|
);
|
||||||
//
|
let pus_event_service =
|
||||||
// let pus_event_service =
|
create_event_service(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
|
||||||
// create_event_service_dynamic(tm_funnel_tx.clone(), pus_event_rx, event_request_tx);
|
|
||||||
let pus_action_service = create_action_service(
|
let pus_action_service = create_action_service(
|
||||||
tm_funnel_tx.clone(),
|
tm_funnel_tx.clone(),
|
||||||
pus_action_rx,
|
pus_action_rx,
|
||||||
@ -125,7 +136,7 @@ fn main() {
|
|||||||
let mut pus_stack = PusStack::new(
|
let mut pus_stack = PusStack::new(
|
||||||
pus_test_service,
|
pus_test_service,
|
||||||
pus_hk_service,
|
pus_hk_service,
|
||||||
// pus_event_service,
|
pus_event_service,
|
||||||
pus_action_service,
|
pus_action_service,
|
||||||
pus_scheduler_service,
|
pus_scheduler_service,
|
||||||
pus_mode_service,
|
pus_mode_service,
|
||||||
@ -163,7 +174,7 @@ fn main() {
|
|||||||
)
|
)
|
||||||
.expect("tcp server creation failed");
|
.expect("tcp server creation failed");
|
||||||
|
|
||||||
let mut tm_funnel = TmFunnelDynamic::new(
|
let mut tm_sink = TmFunnelDynamic::new(
|
||||||
sync_tm_tcp_source,
|
sync_tm_tcp_source,
|
||||||
tm_funnel_rx,
|
tm_funnel_rx,
|
||||||
tm_tcp_server_tx,
|
tm_tcp_server_tx,
|
||||||
@ -270,17 +281,26 @@ fn main() {
|
|||||||
|
|
||||||
// TM Funnel Task
|
// TM Funnel Task
|
||||||
info!("Starting TM funnel task");
|
info!("Starting TM funnel task");
|
||||||
let funnel_stop_signal = stop_signal.clone();
|
let tm_sink_stop_signal = stop_signal.clone();
|
||||||
let jh_tm_funnel = thread::Builder::new()
|
let jh_tm_funnel = thread::Builder::new()
|
||||||
.name("ops-sat tm-funnel".to_string())
|
.name("ops-sat tm-sink".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
tm_funnel.operation();
|
tm_sink.operation();
|
||||||
if funnel_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
if tm_sink_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
info!("Starting event handling task");
|
||||||
|
let jh_event_handling = thread::Builder::new()
|
||||||
|
.name("sat-rs events".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
event_handler.periodic_operation();
|
||||||
|
thread::sleep(Duration::from_millis(FREQ_MS_EVENT_HANDLING));
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// PUS Handler Task
|
// PUS Handler Task
|
||||||
info!("Starting PUS handlers task");
|
info!("Starting PUS handlers task");
|
||||||
let pus_stop_signal = stop_signal.clone();
|
let pus_stop_signal = stop_signal.clone();
|
||||||
@ -305,6 +325,7 @@ fn main() {
|
|||||||
if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
if camera_stop_signal.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
thread::sleep(Duration::from_millis(FREQ_MS_CAMERA_HANDLING));
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -327,6 +348,9 @@ fn main() {
|
|||||||
jh_pus_handler
|
jh_pus_handler
|
||||||
.join()
|
.join()
|
||||||
.expect("Joining PUS handlers thread failed");
|
.expect("Joining PUS handlers thread failed");
|
||||||
|
jh_event_handling
|
||||||
|
.join()
|
||||||
|
.expect("Joining PUS handlers thread failed");
|
||||||
jh_camera_handler
|
jh_camera_handler
|
||||||
.join()
|
.join()
|
||||||
.expect("Joining camera handler thread failed");
|
.expect("Joining camera handler thread failed");
|
||||||
|
66
src/pus/event.rs
Normal file
66
src/pus/event.rs
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
use super::HandlingStatus;
|
||||||
|
use crate::pus::create_verification_reporter;
|
||||||
|
use log::{error, warn};
|
||||||
|
use ops_sat_rs::config::components::PUS_EVENT_MANAGEMENT;
|
||||||
|
use satrs::pus::event_man::EventRequestWithToken;
|
||||||
|
use satrs::pus::event_srv::PusEventServiceHandler;
|
||||||
|
use satrs::pus::verification::VerificationReporter;
|
||||||
|
use satrs::pus::{
|
||||||
|
EcssTcAndToken, EcssTcInVecConverter, MpscTcReceiver, PusPacketHandlerResult, PusServiceHelper,
|
||||||
|
};
|
||||||
|
use satrs::tmtc::PacketAsVec;
|
||||||
|
|
||||||
|
pub fn create_event_service(
|
||||||
|
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||||
|
pus_event_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||||
|
event_request_tx: mpsc::Sender<EventRequestWithToken>,
|
||||||
|
) -> EventServiceWrapper {
|
||||||
|
let pus_5_handler = PusEventServiceHandler::new(
|
||||||
|
PusServiceHelper::new(
|
||||||
|
PUS_EVENT_MANAGEMENT.id(),
|
||||||
|
pus_event_rx,
|
||||||
|
tm_funnel_tx,
|
||||||
|
create_verification_reporter(PUS_EVENT_MANAGEMENT.id(), PUS_EVENT_MANAGEMENT.apid),
|
||||||
|
EcssTcInVecConverter::default(),
|
||||||
|
),
|
||||||
|
event_request_tx,
|
||||||
|
);
|
||||||
|
EventServiceWrapper {
|
||||||
|
handler: pus_5_handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EventServiceWrapper {
|
||||||
|
pub handler: PusEventServiceHandler<
|
||||||
|
MpscTcReceiver,
|
||||||
|
mpsc::Sender<PacketAsVec>,
|
||||||
|
EcssTcInVecConverter,
|
||||||
|
VerificationReporter,
|
||||||
|
>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventServiceWrapper {
|
||||||
|
pub fn poll_and_handle_next_tc(&mut self, time_stamp: &[u8]) -> HandlingStatus {
|
||||||
|
match self.handler.poll_and_handle_next_tc(time_stamp) {
|
||||||
|
Ok(result) => match result {
|
||||||
|
PusPacketHandlerResult::RequestHandled => {}
|
||||||
|
PusPacketHandlerResult::RequestHandledPartialSuccess(e) => {
|
||||||
|
warn!("PUS 5 partial packet handling success: {e:?}")
|
||||||
|
}
|
||||||
|
PusPacketHandlerResult::CustomSubservice(invalid, _) => {
|
||||||
|
warn!("PUS 5 invalid subservice {invalid}");
|
||||||
|
}
|
||||||
|
PusPacketHandlerResult::SubserviceNotImplemented(subservice, _) => {
|
||||||
|
warn!("PUS 5 subservice {subservice} not implemented");
|
||||||
|
}
|
||||||
|
PusPacketHandlerResult::Empty => return HandlingStatus::Empty,
|
||||||
|
},
|
||||||
|
Err(error) => {
|
||||||
|
error!("PUS packet handling error: {error:?}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
HandlingStatus::HandledOne
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,5 @@
|
|||||||
pub mod action;
|
pub mod action;
|
||||||
|
pub mod event;
|
||||||
pub mod hk;
|
pub mod hk;
|
||||||
pub mod mode;
|
pub mod mode;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
@ -46,7 +47,7 @@ pub fn create_verification_reporter(owner_id: ComponentId, apid: Apid) -> Verifi
|
|||||||
/// Simple router structure which forwards PUS telecommands to dedicated handlers.
|
/// Simple router structure which forwards PUS telecommands to dedicated handlers.
|
||||||
pub struct PusTcMpscRouter {
|
pub struct PusTcMpscRouter {
|
||||||
pub test_tc_sender: Sender<EcssTcAndToken>,
|
pub test_tc_sender: Sender<EcssTcAndToken>,
|
||||||
// pub event_tc_sender: Sender<EcssTcAndToken>,
|
pub event_tc_sender: Sender<EcssTcAndToken>,
|
||||||
pub sched_tc_sender: Sender<EcssTcAndToken>,
|
pub sched_tc_sender: Sender<EcssTcAndToken>,
|
||||||
pub hk_tc_sender: Sender<EcssTcAndToken>,
|
pub hk_tc_sender: Sender<EcssTcAndToken>,
|
||||||
pub action_tc_sender: Sender<EcssTcAndToken>,
|
pub action_tc_sender: Sender<EcssTcAndToken>,
|
||||||
@ -109,16 +110,16 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
|||||||
tc_in_memory,
|
tc_in_memory,
|
||||||
token: Some(accepted_token.into()),
|
token: Some(accepted_token.into()),
|
||||||
})?,
|
})?,
|
||||||
// PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
|
PusServiceId::Event => self.pus_router.event_tc_sender.send(EcssTcAndToken {
|
||||||
// tc_in_memory,
|
tc_in_memory,
|
||||||
// token: Some(accepted_token.into()),
|
token: Some(accepted_token.into()),
|
||||||
// })?,
|
})?,
|
||||||
// PusServiceId::Scheduling => {
|
PusServiceId::Scheduling => {
|
||||||
// self.pus_router.sched_tc_sender.send(EcssTcAndToken {
|
self.pus_router.sched_tc_sender.send(EcssTcAndToken {
|
||||||
// tc_in_memory,
|
tc_in_memory,
|
||||||
// token: Some(accepted_token.into()),
|
token: Some(accepted_token.into()),
|
||||||
// })?
|
})?
|
||||||
// }
|
}
|
||||||
_ => {
|
_ => {
|
||||||
let result = self.verif_reporter.start_failure(
|
let result = self.verif_reporter.start_failure(
|
||||||
&self.tm_sender,
|
&self.tm_sender,
|
||||||
@ -138,10 +139,10 @@ impl<TmSender: EcssTmSender> PusTcDistributor<TmSender> {
|
|||||||
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
|
if let Ok(custom_service) = CustomPusServiceId::try_from(e.number) {
|
||||||
match custom_service {
|
match custom_service {
|
||||||
CustomPusServiceId::Mode => {
|
CustomPusServiceId::Mode => {
|
||||||
// self.pus_router.mode_tc_sender.send(EcssTcAndToken {
|
self.pus_router.mode_tc_sender.send(EcssTcAndToken {
|
||||||
// tc_in_memory,
|
tc_in_memory,
|
||||||
// token: Some(accepted_token.into()),
|
token: Some(accepted_token.into()),
|
||||||
// })?
|
})?
|
||||||
}
|
}
|
||||||
CustomPusServiceId::Health => {}
|
CustomPusServiceId::Health => {}
|
||||||
}
|
}
|
||||||
|
@ -4,15 +4,15 @@ use derive_new::new;
|
|||||||
use satrs::spacepackets::time::{cds, TimeWriter};
|
use satrs::spacepackets::time::{cds, TimeWriter};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
action::ActionServiceWrapper, hk::HkServiceWrapper, mode::ModeServiceWrapper,
|
action::ActionServiceWrapper, event::EventServiceWrapper, hk::HkServiceWrapper,
|
||||||
scheduler::SchedulingService, TargetedPusService,
|
mode::ModeServiceWrapper, scheduler::SchedulingService, TargetedPusService,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(new)]
|
#[derive(new)]
|
||||||
pub struct PusStack {
|
pub struct PusStack {
|
||||||
test_srv: TestCustomServiceWrapper,
|
test_srv: TestCustomServiceWrapper,
|
||||||
hk_srv_wrapper: HkServiceWrapper,
|
hk_srv_wrapper: HkServiceWrapper,
|
||||||
// event_srv: EventServiceWrapper<TmSender, TcInMemConverter>,
|
event_srv: EventServiceWrapper,
|
||||||
action_srv_wrapper: ActionServiceWrapper,
|
action_srv_wrapper: ActionServiceWrapper,
|
||||||
schedule_srv: SchedulingService,
|
schedule_srv: SchedulingService,
|
||||||
mode_srv: ModeServiceWrapper,
|
mode_srv: ModeServiceWrapper,
|
||||||
@ -50,7 +50,7 @@ impl PusStack {
|
|||||||
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
|
self.schedule_srv.poll_and_handle_next_tc(&time_stamp),
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
// is_srv_finished(self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
is_srv_finished(5, self.event_srv.poll_and_handle_next_tc(&time_stamp), None);
|
||||||
is_srv_finished(
|
is_srv_finished(
|
||||||
8,
|
8,
|
||||||
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
self.action_srv_wrapper.poll_and_handle_next_tc(&time_stamp),
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
use crate::pus::create_verification_reporter;
|
use crate::pus::create_verification_reporter;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
|
use ops_sat_rs::config::components::PUS_TEST_SERVICE;
|
||||||
use ops_sat_rs::config::tmtc_err;
|
use ops_sat_rs::config::{tmtc_err, TEST_EVENT};
|
||||||
// use satrs::event_man::{EventMessage, EventMessageU32};
|
use satrs::event_man::{EventMessage, EventMessageU32};
|
||||||
use satrs::pus::test::PusService17TestHandler;
|
use satrs::pus::test::PusService17TestHandler;
|
||||||
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
|
use satrs::pus::verification::{FailParams, VerificationReporter, VerificationReportingProvider};
|
||||||
use satrs::pus::{
|
use satrs::pus::{
|
||||||
@ -20,6 +20,7 @@ use super::HandlingStatus;
|
|||||||
|
|
||||||
pub fn create_test_service(
|
pub fn create_test_service(
|
||||||
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
tm_funnel_tx: mpsc::Sender<PacketAsVec>,
|
||||||
|
event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||||
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
pus_test_rx: mpsc::Receiver<EcssTcAndToken>,
|
||||||
) -> TestCustomServiceWrapper {
|
) -> TestCustomServiceWrapper {
|
||||||
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
let pus17_handler = PusService17TestHandler::new(PusServiceHelper::new(
|
||||||
@ -31,7 +32,7 @@ pub fn create_test_service(
|
|||||||
));
|
));
|
||||||
TestCustomServiceWrapper {
|
TestCustomServiceWrapper {
|
||||||
handler: pus17_handler,
|
handler: pus17_handler,
|
||||||
// test_srv_event_sender: event_sender,
|
event_tx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,6 +43,7 @@ pub struct TestCustomServiceWrapper {
|
|||||||
EcssTcInVecConverter,
|
EcssTcInVecConverter,
|
||||||
VerificationReporter,
|
VerificationReporter,
|
||||||
>,
|
>,
|
||||||
|
pub event_tx: mpsc::SyncSender<EventMessageU32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestCustomServiceWrapper {
|
impl TestCustomServiceWrapper {
|
||||||
@ -79,9 +81,9 @@ impl TestCustomServiceWrapper {
|
|||||||
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
|
time_stamper.write_to_bytes(&mut stamp_buf).unwrap();
|
||||||
if subservice == 128 {
|
if subservice == 128 {
|
||||||
info!("Generating test event");
|
info!("Generating test event");
|
||||||
// self.test_srv_event_sender
|
self.event_tx
|
||||||
// .send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
|
.send(EventMessage::new(PUS_TEST_SERVICE.id(), TEST_EVENT.into()))
|
||||||
// .expect("Sending test event failed");
|
.expect("Sending test event failed");
|
||||||
let start_token = self
|
let start_token = self
|
||||||
.handler
|
.handler
|
||||||
.service_helper
|
.service_helper
|
||||||
|
Loading…
Reference in New Issue
Block a user