Skip to content

Commit

Permalink
Improve Sonic Channel buffer management (that’s the proper way)
Browse files Browse the repository at this point in the history
Signed-off-by: Valerian Saliou <[email protected]>
  • Loading branch information
valeriansaliou committed Mar 25, 2019
1 parent fb07910 commit 1c2b9c8
Showing 1 changed file with 39 additions and 28 deletions.
67 changes: 39 additions & 28 deletions src/channel/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// Copyright: 2019, Valerian Saliou <[email protected]>
// License: Mozilla Public License v2.0 (MPL v2.0)

use std::collections::VecDeque;
use std::io::{ErrorKind, Read, Write};
use std::net::TcpStream;
use std::result::Result;
Expand Down Expand Up @@ -90,7 +91,7 @@ impl ChannelHandle {
.expect("write failed");

// Initialize packet buffer
let mut buffer = Vec::with_capacity(MAX_LINE_SIZE);
let mut buffer: VecDeque<u8> = VecDeque::with_capacity(MAX_LINE_SIZE);

// Wait for incoming messages
'handler: loop {
Expand All @@ -103,37 +104,47 @@ impl ChannelHandle {
break;
}

// Buffer chunk
buffer.extend_from_slice(&read[0..n]);

// Should handle this chunk? (terminated)
if buffer[buffer.len() - 1] == BUFFER_LINE_SEPARATOR {
{
// Handle all buffered chunks as lines
let buffer_split =
buffer.split(|value| value == &BUFFER_LINE_SEPARATOR);

for line in buffer_split {
if line.is_empty() == false {
if Self::on_message(&mode, &stream, line)
== ChannelMessageResult::Close
{
// Should close?
break 'handler;
}
// Buffer overflow?
if (n + buffer.len()) > MAX_LINE_SIZE {
// Do not continue, as there is too much pending data in the \
// buffer. Most likely the client does not implement a proper \
// back-pressure management system, thus we terminate it.
info!("closing channel thread because of buffer overflow");

panic!("buffer overflow");
}

// Add chunk to buffer
buffer.extend(&read[0..n]);

// Handle full lines from buffer (keep the last incomplete line in \
// buffer)
{
let mut processed_line = Vec::with_capacity(MAX_LINE_SIZE);

while let Some(byte) = buffer.pop_front() {
// Commit line and start a new one?
if byte == BUFFER_LINE_SEPARATOR {
if Self::on_message(&mode, &stream, &processed_line)
== ChannelMessageResult::Close
{
// Should close?
break 'handler;
}

// Important: clear the contents of the line, as it has \
// just been processed.
processed_line.clear();
} else {
// Append current byte to processed line
processed_line.push(byte);
}
}

// Reset buffer
buffer.clear();
} else {
// This buffer does not end with a line separator; it likely \
// contains data that is way too long, and thus it should be \
// aborted to avoid stacking up too much data in a row.
info!("closing channel thread because of buffer overflow");

panic!("buffer overflow");
// Incomplete line remaining? Put it back in buffer.
if processed_line.is_empty() == false {
buffer.extend(processed_line);
}
}
}
Err(err) => {
Expand Down

0 comments on commit 1c2b9c8

Please sign in to comment.