From 5b422d16156103c9f02ad43d2503511e214eb7fb Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 31 May 2024 15:43:54 +0200 Subject: [PATCH] example almost done --- examples/src/main.rs | 188 ++++++++++++++++++++++++++++------ libcsp-cargo-build/src/lib.rs | 2 + libcsp-rust/src/ffi.rs | 40 ++++++++ libcsp-rust/src/lib.rs | 183 +++++++++++++++++++++++++++++++-- 4 files changed, 373 insertions(+), 40 deletions(-) diff --git a/examples/src/main.rs b/examples/src/main.rs index 8c8a860..7877d43 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -1,11 +1,25 @@ -use std::{thread, time::Duration}; - -use libcsp_rust::{ - csp_accept, csp_bind, csp_init, csp_listen, csp_read, csp_route_work, CspSocket, CSP_ANY, +use std::{ + sync::{ + atomic::{AtomicBool, AtomicU32}, + Arc, + }, + thread, + time::Duration, }; -/* +use libcsp_rust::{ + csp_accept_guarded, csp_bind, csp_buffer_get, csp_conn_dport, csp_conn_print_table, + csp_connect_guarded, csp_iflist_print, csp_init, csp_listen, csp_ping, csp_read, csp_reboot, + csp_route_work, csp_send, csp_service_handler, ConnectOpts, CspSocket, MsgPriority, + SocketFlags, CSP_ANY, CSP_LOOPBACK, +}; +const MY_SERVER_PORT: i32 = 10; +const RUN_DURATION_IN_SECS: u32 = 3; + +const TEST_MODE: bool = false; + +/* #include #include #include @@ -228,11 +242,17 @@ int main(int argc, char * argv[]) { } */ -fn main() { +fn main() -> Result<(), u32> { println!("CSP server example"); // SAFETY: We only call this once. unsafe { csp_init() }; + let stop_signal = Arc::new(AtomicBool::new(false)); + let stop_signal_server = stop_signal.clone(); + let stop_signal_client = stop_signal.clone(); + let server_received = Arc::new(AtomicU32::new(0)); + let server_recv_copy = server_received.clone(); + let csp_router_jh = thread::spawn(|| loop { if let Err(e) = csp_route_work() { match e { @@ -245,12 +265,47 @@ fn main() { } }); - server(); + let csp_server_jh = thread::spawn(|| { + server(server_received, stop_signal_server); + }); + + let csp_client_jh = thread::spawn(|| { + client(stop_signal_client); + }); + + println!("CSP connection table"); + csp_conn_print_table(); + + println!("CSP interfaces"); + csp_iflist_print(); + let mut app_result = Ok(()); + // Wait for execution to end (ctrl+c) + loop { + std::thread::sleep(Duration::from_secs(RUN_DURATION_IN_SECS as u64)); + + if TEST_MODE { + // Test mode is intended for checking that host & client can exchange packets over loopback + let received_count = server_recv_copy.load(std::sync::atomic::Ordering::Relaxed); + println!("CSP: Server received {} packets", received_count); + if received_count < 5 { + stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + app_result = Err(1); + break; + } + stop_signal.store(true, std::sync::atomic::Ordering::Relaxed); + break; + } + } csp_router_jh.join().unwrap(); + csp_server_jh.join().unwrap(); + csp_client_jh.join().unwrap(); + app_result } -fn server() { +fn server(server_received: Arc, stop_signal: Arc) { + println!("server task started"); + // Create socket with no specific socket options, e.g. accepts CRC32, HMAC, etc. if enabled // during compilation let mut csp_socket = CspSocket::default(); @@ -263,41 +318,108 @@ fn server() { // Wait for connections and then process packets on the connection loop { - // Wait for a new connection, 10000 mS timeout + if stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } - let conn = csp_accept(&mut csp_socket, Duration::from_millis(10000)); + // Wait for a new connection, 10000 mS timeout + let conn = csp_accept_guarded(&mut csp_socket, Duration::from_millis(10000)); if conn.is_none() { continue; } - let mut conn = conn.unwrap(); + let conn = conn.unwrap(); // Read packets on connection, timout is 100 mS - // csp_packet_t *packet; loop { - let packet = csp_read(&mut conn, Duration::from_millis(100)); + if stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + // SAFETY: Connection is active while we read here. + let packet = unsafe { csp_read(conn.0, Duration::from_millis(100)) }; if packet.is_none() { break; } + let mut packet = packet.unwrap(); + match csp_conn_dport(conn.0) { + MY_SERVER_PORT => { + server_received.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // Process packet here. + println!( + "packet received on MY_SERVER_PORT: {:x?}\n", + packet.packet_data() + ); + } + _ => { + csp_service_handler(&mut packet); + } + }; } - /* - while ((packet = csp_read(conn, 50)) != NULL) { - switch (csp_conn_dport(conn)) { - case MY_SERVER_PORT: - /* Process packet here */ - csp_print("Packet received on MY_SERVER_PORT: %s\n", (char *) packet->data); - csp_buffer_free(packet); - ++server_received; - break; - - default: - /* Call the default CSP service handler, handle pings, buffer use, etc. */ - csp_service_handler(packet); - break; - } - } - */ - - /* Close current connection */ - // csp_close(conn); + // No need to close, we accepted the connection with a guard. + } +} + +fn client(stop_signal: Arc) { + println!("client task started"); + let mut current_letter = 'A'; + + loop { + if stop_signal.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + if TEST_MODE { + thread::sleep(Duration::from_millis(20)); + } else { + thread::sleep(Duration::from_millis(100)); + } + // Send ping to server, timeout 1000 mS, ping size 100 bytes + if let Err(e) = csp_ping( + CSP_LOOPBACK, + Duration::from_millis(1000), + 100, + SocketFlags::NONE, + ) { + println!("ping error: {:?}", e); + } + + // Send reboot request to server, the server has no actual implementation of + // csp_sys_reboot() and fails to reboot. + csp_reboot(CSP_LOOPBACK); + println!("reboot system request sent to address: {}", CSP_LOOPBACK); + + // Send data packet (string) to server + + // 1. Connect to host on 'server_address', port MY_SERVER_PORT with regular UDP-like + // protocol and 1000 ms timeout. + let conn = csp_connect_guarded( + MsgPriority::Normal, + CSP_LOOPBACK, + MY_SERVER_PORT as u8, + Duration::from_millis(1000), + ConnectOpts::NONE, + ); + if conn.is_none() { + println!("CSP client: connection failed"); + return; + } + let conn = conn.unwrap(); + + // 2. Get packet buffer for message/data. + let packet_ref = csp_buffer_get(); + if packet_ref.is_none() { + println!("CSP client: failed to get CSP buffer"); + return; + } + let mut packet_ref = packet_ref.unwrap(); + + // 3. Copy data to packet. + let mut string_to_set = String::from("Hello world"); + string_to_set.push(' '); + string_to_set.push(current_letter); + current_letter = (current_letter as u8 + 1) as char; + string_to_set.push('\0'); + packet_ref.set_data(string_to_set.as_bytes()); + + // 4. Send data. + csp_send(conn.0, packet_ref); } } diff --git a/libcsp-cargo-build/src/lib.rs b/libcsp-cargo-build/src/lib.rs index 16d43a3..819570f 100644 --- a/libcsp-cargo-build/src/lib.rs +++ b/libcsp-cargo-build/src/lib.rs @@ -46,6 +46,8 @@ const SRCS_LIST: &[&str] = &[ "csp_port.c", "csp_route.c", "csp_dedup.c", + "csp_services.c", + "csp_service_handler.c", "interfaces/csp_if_lo.c", "interfaces/csp_if_kiss.c", "interfaces/csp_if_tun.c", diff --git a/libcsp-rust/src/ffi.rs b/libcsp-rust/src/ffi.rs index 790d609..6700c43 100644 --- a/libcsp-rust/src/ffi.rs +++ b/libcsp-rust/src/ffi.rs @@ -188,6 +188,46 @@ extern "C" { #[doc = " Read packet from a connection.\n This fuction will wait on the connection's RX queue for the specified timeout.\n\n @param[in] conn connection\n @param[in] timeout timeout in mS to wait for a packet, use CSP_MAX_TIMEOUT for infinite timeout.\n @return Packet or NULL in case of failure or timeout."] pub fn csp_read(conn: *mut csp_conn_t, timeout: u32) -> *mut csp_packet_t; + + #[doc = " Handle CSP service request.\n If the given packet is a service-request (the destination port matches one of CSP service ports #csp_service_port_t),\n the packet will be processed by the specific CSP service handler.\n The packet will either process it or free it, so this function is typically called in the last \"default\" clause of\n a switch/case statement in a CSP listener task.\n In order to listen to csp service ports, bind your listener to the specific services ports #csp_service_port_t or\n use #CSP_ANY to all ports.\n\n @param[in] packet first packet, obtained by using csp_read()"] + pub fn csp_service_handler(packet: *mut csp_packet_t); + + #[doc = " Close an open connection.\n Any packets in the RX queue will be freed.\n\n @param[in] conn connection. Closing a NULL connection is acceptable.\n @return #CSP_ERR_NONE on success, otherwise an error code."] + pub fn csp_close(conn: *mut csp_conn_t) -> ::core::ffi::c_int; + + #[doc = " Send a single ping/echo packet.\n\n @param[in] node address of subsystem.\n @param[in] timeout timeout in ms to wait for reply.\n @param[in] size payload size in bytes.\n @param[in] opts connection options, see @ref CSP_CONNECTION_OPTIONS.\n @return >=0 echo time in mS on success, otherwise -1 for error."] + pub fn csp_ping( + node: u16, + timeout: u32, + size: ::core::ffi::c_uint, + opts: u8, + ) -> ::core::ffi::c_int; + + #[doc = " Reboot subsystem.\n If handled by the standard CSP service handler, the reboot handler set by csp_sys_set_reboot() on the subsystem, will be invoked.\n\n @param[in] node address of subsystem.\n"] + pub fn csp_reboot(node: u16); + + #[doc = " Establish outgoing connection.\n The call will return immediately, unless it is a RDP connection (#CSP_O_RDP) in which case it will wait until the other\n end acknowleges the connection (timeout is determined by the current connection timeout set by csp_rdp_set_opt()).\n\n @param[in] prio priority, see #csp_prio_t\n @param[in] dst Destination address\n @param[in] dst_port Destination port\n @param[in] timeout unused.\n @param[in] opts connection options, see @ref CSP_CONNECTION_OPTIONS.\n @return Established connection or NULL on failure (no free connections, timeout)."] + pub fn csp_connect( + prio: u8, + dst: u16, + dst_port: u8, + timeout: u32, + opts: u32, + ) -> *mut csp_conn_t; + + #[doc = " Return destination port of connection.\n\n @param[in] conn connection\n @return destination port of an incoming connection"] + pub fn csp_conn_dport(conn: *mut csp_conn_t) -> ::core::ffi::c_int; + + #[doc = " Get free buffer from task context.\n\n @param[in] unused OBSOLETE ignored field, csp packets have a fixed size now\n @return Buffer pointer to #csp_packet_t or NULL if no buffers available"] + pub fn csp_buffer_get(unused: usize) -> *mut csp_packet_t; + + #[doc = " Send packet on a connection.\n The packet buffer is automatically freed, and cannot be used after the call to csp_send()\n\n @param[in] conn connection\n @param[in] packet packet to send"] + pub fn csp_send(conn: *mut csp_conn_t, packet: *mut csp_packet_t); + + #[doc = " Print connection table to stdout."] + pub fn csp_conn_print_table(); + + pub fn csp_iflist_print(); } #[cfg(test)] diff --git a/libcsp-rust/src/lib.rs b/libcsp-rust/src/lib.rs index d4f1e31..b18e49a 100644 --- a/libcsp-rust/src/lib.rs +++ b/libcsp-rust/src/lib.rs @@ -50,6 +50,7 @@ pub enum CspError { /// Listen on all ports, primarily used with [csp_bind] pub const CSP_ANY: u8 = 255; +pub const CSP_LOOPBACK: u16 = 0; bitflags! { pub struct SocketFlags: u32 { @@ -101,13 +102,34 @@ pub enum MsgPriority { pub struct CspPacket(pub csp_packet_s); -pub struct CspPacketRef<'a>(&'a csp_packet_s); +pub struct CspPacketRef<'a>(&'a mut csp_packet_s); impl<'a> CspPacketRef<'a> { - pub fn packet_data(&self) -> &'a [u8; ffi::CSP_BUFFER_SIZE] { + pub fn packet_data(&self) -> &[u8] { + unsafe { &self.0.packet_data_union.data[..self.packet_length()] } + } + + pub fn whole_data(&self) -> &[u8; ffi::CSP_BUFFER_SIZE] { unsafe { &self.0.packet_data_union.data } } + pub fn whole_data_mut(&mut self) -> &mut [u8; ffi::CSP_BUFFER_SIZE] { + unsafe { &mut self.0.packet_data_union.data } + } + + pub fn set_data(&mut self, data: &[u8]) -> bool { + if data.len() > self.whole_data().len() { + return false; + } + self.whole_data_mut()[0..data.len()].copy_from_slice(data); + self.0.length = data.len() as u16; + true + } + + pub fn packet_length(&self) -> usize { + self.0.length.into() + } + pub fn inner(&self) -> *const csp_packet_s { self.0 } @@ -188,10 +210,11 @@ pub fn csp_route_work() -> Result<(), CspError> { if result == CspError::None as i32 { return Ok(()); } - Err(CspError::try_from(result).expect("unexpected error type from csp_route_work")) + Err(CspError::try_from(result) + .unwrap_or_else(|_| panic!("unexpected error value {} from csp_route_work", result))) } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct CspConn(csp_conn_s); impl CspConn { @@ -204,6 +227,30 @@ impl CspConn { } } +pub struct CspConnGuard(pub CspConn); + +impl Drop for CspConnGuard { + fn drop(&mut self) { + csp_close(self.0); + } +} + +impl AsRef for CspConnGuard { + fn as_ref(&self) -> &CspConn { + &self.0 + } +} + +impl AsMut for CspConnGuard { + fn as_mut(&mut self) -> &mut CspConn { + &mut self.0 + } +} + +pub fn csp_accept_guarded(socket: &mut CspSocket, timeout: Duration) -> Option { + Some(CspConnGuard(csp_accept(socket, timeout)?)) +} + /// Rust wrapper for [ffi::csp_accept]. pub fn csp_accept(socket: &mut CspSocket, timeout: Duration) -> Option { let timeout_millis = timeout.as_millis(); @@ -220,7 +267,11 @@ pub fn csp_accept(socket: &mut CspSocket, timeout: Duration) -> Option } /// Rust wrapper for [ffi::csp_read]. -pub fn csp_read(conn: &mut CspConn, timeout: Duration) -> Option> { +/// +/// # Safety +/// +/// - You MUST ensure that a connection is active when calling this function. +pub unsafe fn csp_read<'a>(mut conn: CspConn, timeout: Duration) -> Option> { let timeout_millis = timeout.as_millis(); if timeout_millis > u32::MAX as u128 { return None; @@ -229,7 +280,125 @@ pub fn csp_read(conn: &mut CspConn, timeout: Duration) -> Option i32 { + // SAFETY: FFI call. + unsafe { ffi::csp_conn_dport(&mut conn.0) } +} + +pub fn csp_service_handler(packet: &mut CspPacketRef) { + // SAFETY: FFI call. + unsafe { ffi::csp_service_handler(&mut *packet.0) } +} + +/// Rust wrapper for [ffi::csp_close]. +pub fn csp_close(mut conn: CspConn) -> i32 { + // SAFETY: FFI call. + unsafe { ffi::csp_close(&mut conn.0) } +} + +/// Rust wrapper for [ffi::csp_ping], returns the result code directly. +pub fn csp_ping_raw(node: u16, timeout: Duration, size: usize, opts: SocketFlags) -> i32 { + // SAFETY: FFI call. + unsafe { + ffi::csp_ping( + node, + timeout.as_millis() as u32, + size as u32, + opts.bits() as u8, + ) + } +} + +/// Rust wrapper for [ffi::csp_ping]. +pub fn csp_ping( + node: u16, + timeout: Duration, + size: usize, + opts: SocketFlags, +) -> Result<(), CspError> { + let result = csp_ping_raw(node, timeout, size, opts); + if result == CspError::None as i32 { + return Ok(()); + } + Err(CspError::try_from(result) + .unwrap_or_else(|_| panic!("unexpected error value {} from csp_ping", result))) +} + +/// Rust wrapper for [ffi::csp_reboot]. +pub fn csp_reboot(node: u16) { + // SAFETY: FFI call. + unsafe { ffi::csp_reboot(node) } +} + +/// Rust wrapper for [ffi::csp_connect]. +pub fn csp_connect( + prio: MsgPriority, + dst: u16, + dst_port: u8, + timeout: Duration, + opts: ConnectOpts, +) -> Option { + // SAFETY: FFI call. + let conn = unsafe { + ffi::csp_connect( + prio as u8, + dst, + dst_port, + timeout.as_millis() as u32, + opts.bits(), + ) + }; + if conn.is_null() { + return None; + } + // SAFETY: We checked that the pointer is valid. + Some(CspConn::new(unsafe { *conn }.address)) +} + +/// Rust wrapper for [ffi::csp_connect] which returns a guard structure. The connection will be +/// be closed automatically when the guard structure is dropped. +pub fn csp_connect_guarded( + prio: MsgPriority, + dst: u16, + dst_port: u8, + timeout: Duration, + opts: ConnectOpts, +) -> Option { + Some(CspConnGuard(csp_connect( + prio, dst, dst_port, timeout, opts, + )?)) +} + +/// Rust wrapper for [ffi::csp_buffer_get]. +pub fn csp_buffer_get() -> Option> { + let packet_ref = unsafe { + // The size argument is unused + ffi::csp_buffer_get(0) + }; + if packet_ref.is_null() { + return None; + } + // SAFETY: We checked that the pointer is valid. + Some(CspPacketRef(unsafe { &mut *packet_ref })) +} + +/// Rust wrapper for [ffi::csp_send]. +pub fn csp_send(mut conn: CspConn, packet: CspPacketRef) { + // SAFETY: FFI call. + unsafe { ffi::csp_send(&mut conn.0, packet.0) } +} + +/// Rust wrapper for [ffi::csp_conn_print_table]. +pub fn csp_conn_print_table() { + unsafe { ffi::csp_conn_print_table() } +} + +/// Rust wrapper for [ffi::csp_iflist_print]. +pub fn csp_iflist_print() { + unsafe { ffi::csp_iflist_print() } +}