use crate::can_ids::{ can_id_to_package_id, package_id_to_can_id, DeviceId, PackageId, PackageModel, SenderReceiverThread, ThreadId, }; use embedded_can::{self, Frame}; use log::{debug, warn}; use socketcan::{errors, frame, socket, CanFrame, Socket}; use std::collections::HashMap; use std::io; use std::sync::mpsc::{Receiver, Sender}; pub struct CanRxHandler { interface: &'static str, socket: socket::CanSocket, //frame_id_to_sender_id: HashMap, // double hash map: frame id -> receiver id -> sender handle can_senders: HashMap>, //dismissed_ids: Vec, package_map: HashMap, //packet_id_to_sender_id: HashMap, } impl CanRxHandler { pub fn new_socket( interface: &'static str, //frame_id_to_sender_id: HashMap, can_senders: HashMap>, package_map: HashMap, ) -> Result { let socket = socket::CanSocket::open(&interface); if let Ok(socket) = socket { Ok(CanRxHandler { interface, socket, can_senders, package_map, }) } else { Err(()) } } pub fn process_incoming(&mut self) { if let Some(frame) = self.rx_socket() { self.forward_frame(frame); } } pub fn read_frame(&self) -> io::Result { let frame = self.socket.read_frame(); if let Err(e) = &frame { warn!("CAN bus read error: {}", e); } frame } pub fn rx_socket(&self) -> Option { let frame = self.socket.read_frame().ok()?; debug!("Can Frame read: {:?}.", frame); return Some(frame); } pub fn forward_frame(&self, frame: CanFrame) { let frame_id = can_id_to_package_id(frame.id()); debug!("Frame forwarding with id: {:?}", frame_id); if let Some(frame_id) = frame_id { if self.package_map.contains_key(&frame_id) { let value = self.package_map.get(&frame_id).unwrap(); if value.get_sender() != DeviceId::OBC { let message_sender = self.can_senders.get(&value.get_thread()).unwrap(); let data = frame.data(); let message = PackageModel::new(frame_id, data).expect("Error generating message."); message_sender.send(message).expect(&*format!( "Failure sending can bus frame to thread{:?}, frame id {:?}", value.get_thread(), frame_id )); } } } } } pub struct CanTxHandler { interface: &'static str, socket: socket::CanSocket, package_map: HashMap, message_receiver: Receiver, } impl CanTxHandler { pub fn new_socket( interface: &'static str, package_map: HashMap, message_receiver: Receiver, ) -> Result { let socket = socket::CanSocket::open(&interface); if let Ok(socket) = socket { socket.filter_drop_all().unwrap(); // tx nodes cannot receive data Ok(CanTxHandler { interface, socket, package_map, message_receiver, }) } else { Err(()) } } pub fn process_incoming(&mut self) { if let Ok(package) = self.message_receiver.recv() { self.tx_socket(package.package_id(), package.data()); } } pub fn tx_socket(&self, package_id: PackageId, data: &[u8]) { if self.package_map.contains_key(&package_id) { let value = self.package_map.get(&package_id).unwrap(); if value.get_sender() == DeviceId::OBC { if data.len() <= 8 { let frame_id = package_id_to_can_id(&package_id); let frame = CanFrame::new(frame_id, data); if let Some(frame) = frame { self.socket .write_frame(&frame) .expect("Error writing frame."); } } else { warn!( "Message dismissed, data length ({:?}) exceeds 8 bytes", data.len() ); } } else { warn!( "Message dismissed, wrong sender id: {:?}", value.get_sender() ); } } else { warn!("Message dismissed, wrong package id: {:?}", package_id); } } } /*pub struct CanTxHandler { interface: &'static str, socket: socket::CanSocket, thread_id: ThreadId, package_map: HashMap, } impl CanTxHandler { pub fn new_socket( interface: &'static str, thread_id: ThreadId, package_map: HashMap, ) -> Result { let socket = socket::CanSocket::open(&interface); if let Ok(socket) = socket { socket.filter_drop_all().unwrap(); // tx nodes cannot receive data Ok(CanTxHandler { interface, socket, thread_id, package_map, }) } else { Err(()) } } pub fn tx_socket(&self, package_id: PackageId, data: &[u8]) { if self.package_map.contains_key(&package_id) { let value = self.package_map.get(&package_id).unwrap(); if value.get_sender() == DeviceId::OBC { //if value.get_thread() == self.thread_id { if data.len() <= 8 { let frame_id = package_id_to_can_id(&package_id); let frame = CanFrame::new(frame_id, data); if let Some(frame) = frame { self.socket .write_frame(&frame) .expect("Error writing frame."); } } else { warn!( "Message dismissed, data length ({:?}) exceeds 8 bytes", data.len() ); } /*} else { warn!( "Message dismissed, mismatched thread id: {:?}", value.get_thread() ); } */ } else { warn!( "Message dismissed, wrong sender id: {:?}", value.get_sender() ); } } else { warn!("Message dismissed, wrong package id: {:?}", package_id); } } /* pub fn tx_socket_from_frame(&self, frame: frame::CanFrame) -> io::Result<()> { let frame_id = frame.id(); if !self.allowed_frame_ids.contains(&frame_id) { warn!( "Requested frame Id {:?} not allowed for current thread", frame.id() ); } else if let Err(e) = self.socket.write_frame(&frame) { warn!("CAN bus write error: {}", e); } return Ok(()); } pub fn tx_socket_from_data( &self, frame_id: embedded_can::Id, frame_data: &[u8], ) -> io::Result<()> { let frame = frame::CanFrame::new(frame_id, &frame_data).expect(&*format!( "Failure sending can bus frame with id {:?}", frame_id )); self.tx_socket_from_frame(frame) } */ } */ pub fn open_socket(interface: &str) -> Result { let socket = socket::CanSocket::open(&interface); return socket; }