This commit is contained in:
parent
567a0a1cf5
commit
afd7999d5c
@ -148,6 +148,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
///
|
///
|
||||||
/// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
/// fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
|
||||||
/// let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
/// let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
|
||||||
|
/// println!("Received TC: {:x?}", tc_raw);
|
||||||
/// tc_queue.push_back(tc_raw.to_vec());
|
/// tc_queue.push_back(tc_raw.to_vec());
|
||||||
/// Ok(())
|
/// Ok(())
|
||||||
/// }
|
/// }
|
||||||
@ -178,6 +179,7 @@ impl<TmError, TcError> TcpTmSender<TmError, TcError> for CobsTmSender {
|
|||||||
/// next_vec.len()
|
/// next_vec.len()
|
||||||
/// );
|
/// );
|
||||||
/// }
|
/// }
|
||||||
|
/// println!("Sending and encoding TM: {:x?}", next_vec);
|
||||||
/// let next_vec = tm_queue.pop_front().unwrap();
|
/// let next_vec = tm_queue.pop_front().unwrap();
|
||||||
/// buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
/// buffer[0..next_vec.len()].copy_from_slice(&next_vec);
|
||||||
/// return Ok(next_vec.len());
|
/// return Ok(next_vec.len());
|
||||||
@ -386,9 +388,17 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
fn encode_simple_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||||
|
encode_packet(&SIMPLE_PACKET, encoded_buf, current_idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_inverted_packet(encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||||
|
encode_packet(&INVERTED_PACKET, encoded_buf, current_idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_packet(packet: &[u8], encoded_buf: &mut [u8], current_idx: &mut usize) {
|
||||||
encoded_buf[*current_idx] = 0;
|
encoded_buf[*current_idx] = 0;
|
||||||
*current_idx += 1;
|
*current_idx += 1;
|
||||||
*current_idx += encode(&SIMPLE_PACKET, &mut encoded_buf[*current_idx..]);
|
*current_idx += encode(packet, &mut encoded_buf[*current_idx..]);
|
||||||
encoded_buf[*current_idx] = 0;
|
encoded_buf[*current_idx] = 0;
|
||||||
*current_idx += 1;
|
*current_idx += 1;
|
||||||
}
|
}
|
||||||
@ -401,7 +411,7 @@ mod tests {
|
|||||||
TcpTmtcInCobsServer::new(
|
TcpTmtcInCobsServer::new(
|
||||||
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
|
||||||
Box::new(tm_source),
|
Box::new(tm_source),
|
||||||
Box::new(tc_receiver.clone()),
|
Box::new(tc_receiver),
|
||||||
)
|
)
|
||||||
.expect("TCP server generation failed")
|
.expect("TCP server generation failed")
|
||||||
}
|
}
|
||||||
@ -411,8 +421,7 @@ mod tests {
|
|||||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
let tc_receiver = SyncTcCacher::default();
|
let tc_receiver = SyncTcCacher::default();
|
||||||
let tm_source = SyncTmSource::default();
|
let tm_source = SyncTmSource::default();
|
||||||
let mut tcp_server =
|
let mut tcp_server = generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source);
|
||||||
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
|
|
||||||
let dest_addr = tcp_server
|
let dest_addr = tcp_server
|
||||||
.local_addr()
|
.local_addr()
|
||||||
.expect("retrieving dest addr failed");
|
.expect("retrieving dest addr failed");
|
||||||
@ -458,14 +467,12 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_server_basic_no_tm_multi_tc() {}
|
fn test_server_basic_multi_tm_multi_tc() {
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_server_basic_with_tm() {
|
|
||||||
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||||
let tc_receiver = SyncTcCacher::default();
|
let tc_receiver = SyncTcCacher::default();
|
||||||
let mut tm_source = SyncTmSource::default();
|
let mut tm_source = SyncTmSource::default();
|
||||||
tm_source.add_tm(&INVERTED_PACKET);
|
tm_source.add_tm(&INVERTED_PACKET);
|
||||||
|
tm_source.add_tm(&SIMPLE_PACKET);
|
||||||
let mut tcp_server =
|
let mut tcp_server =
|
||||||
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
|
generic_tmtc_server(&auto_port_addr, tc_receiver.clone(), tm_source.clone());
|
||||||
let dest_addr = tcp_server
|
let dest_addr = tcp_server
|
||||||
@ -480,15 +487,19 @@ mod tests {
|
|||||||
panic!("handling connection failed: {:?}", result.unwrap_err());
|
panic!("handling connection failed: {:?}", result.unwrap_err());
|
||||||
}
|
}
|
||||||
let conn_result = result.unwrap();
|
let conn_result = result.unwrap();
|
||||||
assert_eq!(conn_result.num_received_tcs, 1);
|
assert_eq!(conn_result.num_received_tcs, 2, "Not enough TCs received");
|
||||||
assert_eq!(conn_result.num_sent_tms, 1);
|
assert_eq!(conn_result.num_sent_tms, 2, "Not enough TMs received");
|
||||||
set_if_done.store(true, Ordering::Relaxed);
|
set_if_done.store(true, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
// Send TC to server now.
|
// Send TC to server now.
|
||||||
let mut encoded_buf: [u8; 16] = [0; 16];
|
let mut encoded_buf: [u8; 32] = [0; 32];
|
||||||
let mut current_idx = 0;
|
let mut current_idx = 0;
|
||||||
encode_simple_packet(&mut encoded_buf, &mut current_idx);
|
encode_simple_packet(&mut encoded_buf, &mut current_idx);
|
||||||
|
encode_inverted_packet(&mut encoded_buf, &mut current_idx);
|
||||||
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
|
||||||
|
stream
|
||||||
|
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||||
|
.expect("setting reas timeout failed");
|
||||||
stream
|
stream
|
||||||
.write_all(&encoded_buf[..current_idx])
|
.write_all(&encoded_buf[..current_idx])
|
||||||
.expect("writing to TCP server failed");
|
.expect("writing to TCP server failed");
|
||||||
@ -497,18 +508,49 @@ mod tests {
|
|||||||
.shutdown(std::net::Shutdown::Write)
|
.shutdown(std::net::Shutdown::Write)
|
||||||
.expect("shutting down write failed");
|
.expect("shutting down write failed");
|
||||||
let mut read_buf: [u8; 16] = [0; 16];
|
let mut read_buf: [u8; 16] = [0; 16];
|
||||||
let read_len = stream.read(&mut read_buf).expect("read failed");
|
let mut read_len_total = 0;
|
||||||
drop(stream);
|
// Timeout ensures this does not block forever.
|
||||||
|
while read_len_total < 16 {
|
||||||
|
let read_len = stream.read(&mut read_buf).expect("read failed");
|
||||||
|
read_len_total += read_len;
|
||||||
|
// Read until full expected size is available.
|
||||||
|
if read_len == 16 {
|
||||||
|
// Read first TM packet.
|
||||||
|
current_idx = 0;
|
||||||
|
assert_eq!(read_len, 16);
|
||||||
|
assert_eq!(read_buf[0], 0);
|
||||||
|
current_idx += 1;
|
||||||
|
let mut dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
|
||||||
|
.expect("COBS decoding failed");
|
||||||
|
assert_eq!(dec_report.dst_used, 5);
|
||||||
|
// Skip first sentinel byte.
|
||||||
|
assert_eq!(
|
||||||
|
&read_buf[current_idx..current_idx + INVERTED_PACKET.len()],
|
||||||
|
&INVERTED_PACKET
|
||||||
|
);
|
||||||
|
current_idx += dec_report.src_used;
|
||||||
|
// End sentinel.
|
||||||
|
assert_eq!(read_buf[current_idx], 0, "invalid sentinel end byte");
|
||||||
|
current_idx += 1;
|
||||||
|
|
||||||
// 1 byte encoding overhead, 2 sentinel bytes.
|
// Read second TM packet.
|
||||||
assert_eq!(read_len, 8);
|
assert_eq!(read_buf[current_idx], 0);
|
||||||
assert_eq!(read_buf[0], 0);
|
current_idx += 1;
|
||||||
assert_eq!(read_buf[read_len - 1], 0);
|
dec_report = cobs::decode_in_place_report(&mut read_buf[current_idx..])
|
||||||
let decoded_len =
|
.expect("COBS decoding failed");
|
||||||
cobs::decode_in_place(&mut read_buf[1..read_len]).expect("COBS decoding failed");
|
assert_eq!(dec_report.dst_used, 5);
|
||||||
assert_eq!(decoded_len, 5);
|
// Skip first sentinel byte.
|
||||||
// Skip first sentinel byte.
|
assert_eq!(
|
||||||
assert_eq!(&read_buf[1..1 + INVERTED_PACKET.len()], &INVERTED_PACKET);
|
&read_buf[current_idx..current_idx + SIMPLE_PACKET.len()],
|
||||||
|
&SIMPLE_PACKET
|
||||||
|
);
|
||||||
|
current_idx += dec_report.src_used;
|
||||||
|
// End sentinel.
|
||||||
|
assert_eq!(read_buf[current_idx], 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(stream);
|
||||||
|
|
||||||
// A certain amount of time is allowed for the transaction to complete.
|
// A certain amount of time is allowed for the transaction to complete.
|
||||||
for _ in 0..3 {
|
for _ in 0..3 {
|
||||||
@ -524,8 +566,9 @@ mod tests {
|
|||||||
.tc_queue
|
.tc_queue
|
||||||
.lock()
|
.lock()
|
||||||
.expect("locking tc queue failed");
|
.expect("locking tc queue failed");
|
||||||
assert_eq!(tc_queue.len(), 1);
|
assert_eq!(tc_queue.len(), 2);
|
||||||
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
assert_eq!(tc_queue.pop_front().unwrap(), &SIMPLE_PACKET);
|
||||||
|
assert_eq!(tc_queue.pop_front().unwrap(), &INVERTED_PACKET);
|
||||||
drop(tc_queue);
|
drop(tc_queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,11 +44,10 @@ pub fn parse_buffer_for_cobs_encoded_packets<E>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Split frame at the end for a multi-packet frame. Move it to the front of the buffer.
|
// Move split frame at the end to the front of the buffer.
|
||||||
if start_index_packet > 0 && start_found && packets_found > 0 {
|
if start_index_packet > 0 && start_found && packets_found > 0 {
|
||||||
let (first_seg, last_seg) = buf.split_at_mut(start_index_packet - 1);
|
buf.copy_within(start_index_packet - 1.., 0);
|
||||||
first_seg[..last_seg.len()].copy_from_slice(last_seg);
|
*next_write_idx = buf.len() - start_index_packet + 1;
|
||||||
*next_write_idx = last_seg.len();
|
|
||||||
}
|
}
|
||||||
Ok(packets_found)
|
Ok(packets_found)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user