Skip to content

Commit

Permalink
feat(socket): new transport API integration pt. 2
Browse files Browse the repository at this point in the history
  • Loading branch information
mempirate committed Jan 2, 2024
1 parent d5a2337 commit c57264c
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 101 deletions.
36 changes: 0 additions & 36 deletions msg-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,6 @@ pub mod durable;
pub mod quic;
pub mod tcp;

#[async_trait::async_trait]
pub trait ClientTransport {
type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
type ConnectOptions: Default + Clone + Send + Sync + 'static;

async fn connect_with_options(
addr: SocketAddr,
options: Self::ConnectOptions,
) -> Result<Self::Io, Self::Error>;
}

#[async_trait::async_trait]
pub trait ServerTransport: Unpin + Send + Sync + 'static {
type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static;
type Error: std::error::Error + Send + Sync + 'static;
type BindOptions: Default + Send + Sync + 'static;

async fn bind_with_options(
addr: SocketAddr,
options: Self::BindOptions,
) -> Result<Self, Self::Error>
where
Self: Sized;

fn local_addr(&self) -> Result<SocketAddr, Self::Error>;

async fn accept(&self) -> Result<(Self::Io, SocketAddr), Self::Error>;

#[allow(clippy::type_complexity)]
fn poll_accept(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<(Self::Io, SocketAddr), Self::Error>>;
}

/// A transport provides connection-oriented communication between two peers through
/// ordered and reliable streams of bytes.
///
Expand Down
9 changes: 6 additions & 3 deletions msg/benches/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::{Duration, Instant};
use tokio::runtime::Runtime;

use msg_socket::{PubOptions, PubSocket, SubOptions, SubSocket};
use msg_transport::{Tcp, TcpConnectOptions};
use msg_transport::tcp::{self, Tcp};

const N_REQS: usize = 10_000;
const MSG_SIZE: usize = 512;
Expand Down Expand Up @@ -153,13 +153,15 @@ fn pubsub_single_thread_tcp(c: &mut Criterion) {
let buffer_size = 1024 * 64;

let publisher = PubSocket::with_options(
Tcp::default(),
PubOptions::default()
.flush_interval(Duration::from_micros(100))
.backpressure_boundary(buffer_size)
.session_buffer_size(N_REQS * 2),
);

let subscriber = SubSocket::with_options(
Tcp::default(),
SubOptions::default()
.read_buffer_size(buffer_size)
.ingress_buffer_size(N_REQS * 2),
Expand Down Expand Up @@ -196,17 +198,18 @@ fn pubsub_multi_thread_tcp(c: &mut Criterion) {
let buffer_size = 1024 * 64;

let publisher = PubSocket::with_options(
Tcp::default(),
PubOptions::default()
.flush_interval(Duration::from_micros(100))
.backpressure_boundary(buffer_size)
.session_buffer_size(N_REQS * 2),
);

let subscriber = SubSocket::with_options(
Tcp::new(tcp::Config::default().blocking_connect(true)),
SubOptions::default()
.read_buffer_size(buffer_size)
.ingress_buffer_size(N_REQS * 2)
.connect_options(TcpConnectOptions::default().blocking_connect()),
.ingress_buffer_size(N_REQS * 2),
);

let mut bench = PairBenchmark {
Expand Down
18 changes: 11 additions & 7 deletions msg/benches/reqrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use pprof::criterion::Output;
use rand::Rng;

use msg_socket::{RepSocket, ReqOptions, ReqSocket};
use msg_transport::Tcp;
use msg_transport::tcp::{self, Tcp};
use tokio::runtime::Runtime;

const N_REQS: usize = 10_000;
Expand Down Expand Up @@ -118,10 +118,12 @@ fn reqrep_single_thread_tcp(c: &mut Criterion) {
.build()
.unwrap();

let req =
ReqSocket::with_options(ReqOptions::default().flush_interval(Duration::from_micros(50)));
let req = ReqSocket::with_options(
Tcp::new(tcp::Config::default().blocking_connect(true)),
ReqOptions::default().flush_interval(Duration::from_micros(50)),
);

let rep = RepSocket::<Tcp>::new();
let rep = RepSocket::new(Tcp::default());

let mut bench = PairBenchmark {
rt,
Expand Down Expand Up @@ -150,10 +152,12 @@ fn reqrep_multi_thread_tcp(c: &mut Criterion) {
.build()
.unwrap();

let req =
ReqSocket::with_options(ReqOptions::default().flush_interval(Duration::from_micros(50)));
let req = ReqSocket::with_options(
Tcp::default(),
ReqOptions::default().flush_interval(Duration::from_micros(50)),
);

let rep = RepSocket::<Tcp>::new();
let rep = RepSocket::new(Tcp::default());

let mut bench = PairBenchmark {
rt,
Expand Down
15 changes: 7 additions & 8 deletions msg/examples/durable.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::time::Duration;

use bytes::Bytes;
use msg_transport::TcpConnectOptions;
use msg_transport::tcp;
use tokio::sync::oneshot;
use tokio_stream::StreamExt;

use msg::{Authenticator, RepSocket, ReqOptions, ReqSocket, Tcp};
use msg::{tcp::Tcp, Authenticator, RepSocket, ReqOptions, ReqSocket};
use tracing::Instrument;

#[derive(Default)]
Expand All @@ -23,9 +23,9 @@ impl Authenticator for Auth {
async fn start_rep() {
// Initialize the reply socket (server side) with a transport
// and an authenticator.
let mut rep = RepSocket::<Tcp>::new().with_auth(Auth);
let mut rep = RepSocket::new(Tcp::default()).with_auth(Auth);
while rep.bind("0.0.0.0:4444".parse().unwrap()).await.is_err() {
rep = RepSocket::<Tcp>::new().with_auth(Auth);
rep = RepSocket::new(Tcp::default()).with_auth(Auth);
tracing::warn!("Failed to bind rep socket, retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down Expand Up @@ -65,10 +65,9 @@ async fn main() {

// Initialize the request socket (client side) with a transport
// and an identifier. This will implicitly turn on client authentication.
let mut req = ReqSocket::<Tcp>::with_options(
ReqOptions::default()
.timeout(Duration::from_secs(4))
.connect_options(TcpConnectOptions::default().auth_token(Bytes::from("client1"))),
let mut req = ReqSocket::with_options(
Tcp::new(tcp::Config::default().auth_token(Bytes::from("client1"))),
ReqOptions::default().timeout(Duration::from_secs(4)),
);

let (tx, rx) = oneshot::channel();
Expand Down
23 changes: 12 additions & 11 deletions msg/examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
use bytes::Bytes;
use futures::StreamExt;
use msg_transport::TcpConnectOptions;
use std::time::Duration;
use tokio::time::timeout;
use tracing::Instrument;

use msg::{PubOptions, PubSocket, SubOptions, SubSocket, Tcp};
use msg::{
tcp::{self, Tcp},
PubOptions, PubSocket, SubOptions, SubSocket,
};

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt::try_init();
// Configure the publisher socket with options
let mut pub_socket = PubSocket::<Tcp>::with_options(
let mut pub_socket = PubSocket::with_options(
Tcp::default(),
PubOptions::default()
.backpressure_boundary(8192)
.session_buffer_size(1024)
.flush_interval(Duration::from_micros(100)),
);

// Configure the subscribers with options
let mut sub1 = SubSocket::<Tcp>::with_options(
let mut sub1 = SubSocket::with_options(
// TCP transport with blocking connect, usually connection happens in the background.
SubOptions::default()
.ingress_buffer_size(1024)
.connect_options(TcpConnectOptions::default().blocking_connect()),
Tcp::new(tcp::Config::default().blocking_connect(true)),
SubOptions::default().ingress_buffer_size(1024),
);

let mut sub2 = SubSocket::<Tcp>::with_options(
let mut sub2 = SubSocket::with_options(
// TCP transport with blocking connect, usually connection happens in the background.
SubOptions::default()
.ingress_buffer_size(1024)
.connect_options(TcpConnectOptions::default().blocking_connect()),
Tcp::new(tcp::Config::default().blocking_connect(true)),
SubOptions::default().ingress_buffer_size(1024),
);

tracing::info!("Setting up the sockets...");
Expand Down
29 changes: 14 additions & 15 deletions msg/examples/pubsub_auth.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use bytes::Bytes;
use futures::StreamExt;
use msg_transport::TcpConnectOptions;
use std::time::Duration;
use tokio::time::timeout;
use tracing::Instrument;

use msg::{Authenticator, PubSocket, SubOptions, SubSocket, Tcp};
use msg::{
tcp::{self, Tcp},
Authenticator, PubSocket, SubSocket,
};

#[derive(Default)]
struct Auth;
Expand All @@ -27,29 +29,26 @@ impl Authenticator for Auth {
async fn main() {
let _ = tracing_subscriber::fmt::try_init();
// Configure the publisher socket with options
let mut pub_socket = PubSocket::<Tcp>::new()
let mut pub_socket = PubSocket::new(Tcp::default())
// Enable the authenticator
.with_auth(Auth);

// Configure the subscribers with options
let mut sub1 = SubSocket::<Tcp>::with_options(
let mut sub1 = SubSocket::new(
// TCP transport with blocking connect, usually connection happens in the background.
// Set the auth token
SubOptions::default().connect_options(
TcpConnectOptions::default()
Tcp::new(
tcp::Config::default()
.auth_token(Bytes::from("client1"))
.blocking_connect(),
.blocking_connect(true),
),
);

let mut sub2 = SubSocket::<Tcp>::with_options(
// Set the auth token
SubOptions::default().connect_options(
TcpConnectOptions::default()
.auth_token(Bytes::from("client2"))
.blocking_connect(),
),
);
let mut sub2 = SubSocket::new(Tcp::new(
tcp::Config::default()
.auth_token(Bytes::from("client2"))
.blocking_connect(true),
));

tracing::info!("Setting up the sockets...");
pub_socket
Expand Down
22 changes: 11 additions & 11 deletions msg/examples/pubsub_compression.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
use bytes::Bytes;
use futures::StreamExt;
use msg_socket::SubOptions;
use msg_transport::TcpConnectOptions;
use std::time::Duration;
use tokio::time::timeout;
use tokio_stream::StreamExt;
use tracing::Instrument;

use msg::{
compression::{GzipCompressor, GzipDecompressor},
PubSocket, SubSocket, Tcp,
tcp::{self, Tcp},
PubSocket, SubSocket,
};

#[tokio::main]
async fn main() {
let _ = tracing_subscriber::fmt::try_init();
// Configure the publisher socket with options
let mut pub_socket = PubSocket::<Tcp>::new()
let mut pub_socket = PubSocket::new(Tcp::default())
// Enable Gzip compression (compression level 6)
.with_compressor(GzipCompressor::new(6));

// Configure the subscribers with options
let mut sub1 = SubSocket::<Tcp>::with_options(
let mut sub1 = SubSocket::new(
// TCP transport with blocking connect, usually connection happens in the background.
SubOptions::default().connect_options(TcpConnectOptions::default().blocking_connect()),
Tcp::new(tcp::Config::default().blocking_connect(true)),
)
// Enable Gzip decompression (at the same level)
// Enable Gzip decompression
.with_decompressor(GzipDecompressor::new());

let mut sub2 = SubSocket::<Tcp>::with_options(
// Configure the subscribers with options
let mut sub2 = SubSocket::new(
// TCP transport with blocking connect, usually connection happens in the background.
SubOptions::default().connect_options(TcpConnectOptions::default().blocking_connect()),
Tcp::new(tcp::Config::default().blocking_connect(true)),
)
// Enable Gzip decompression (at the same level)
// Enable Gzip decompression
.with_decompressor(GzipDecompressor::new());

tracing::info!("Setting up the sockets...");
Expand Down
6 changes: 3 additions & 3 deletions msg/examples/reqrep.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqSocket, Tcp};
use msg::{tcp::Tcp, RepSocket, ReqSocket};

#[tokio::main]
async fn main() {
// Initialize the reply socket (server side) with a transport
let mut rep = RepSocket::<Tcp>::new();
let mut rep = RepSocket::new(Tcp::default());
rep.bind("0.0.0.0:4444".parse().unwrap()).await.unwrap();

// Initialize the request socket (client side) with a transport
let mut req = ReqSocket::<Tcp>::new();
let mut req = ReqSocket::new(Tcp::default());
req.connect("0.0.0.0:4444".parse().unwrap()).await.unwrap();

tokio::spawn(async move {
Expand Down
15 changes: 8 additions & 7 deletions msg/examples/reqrep_auth.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bytes::Bytes;
use msg_transport::TcpConnectOptions;
use tokio_stream::StreamExt;

use msg::{Authenticator, RepSocket, ReqOptions, ReqSocket, Tcp};
use msg::{
tcp::{self, Tcp},
Authenticator, RepSocket, ReqSocket,
};

#[derive(Default)]
struct Auth;
Expand All @@ -19,15 +21,14 @@ impl Authenticator for Auth {
async fn main() {
// Initialize the reply socket (server side) with a transport
// and an authenticator.
let mut rep = RepSocket::<Tcp>::new().with_auth(Auth);
let mut rep = RepSocket::new(Tcp::default()).with_auth(Auth);
rep.bind("0.0.0.0:4444".parse().unwrap()).await.unwrap();

// Initialize the request socket (client side) with a transport
// and an identifier. This will implicitly turn on client authentication.
let mut req = ReqSocket::<Tcp>::with_options(
ReqOptions::default()
.connect_options(TcpConnectOptions::default().auth_token(Bytes::from("client1"))),
);
let mut req = ReqSocket::new(Tcp::new(
tcp::Config::default().auth_token(Bytes::from("client1")),
));

req.connect("0.0.0.0:4444".parse().unwrap()).await.unwrap();

Expand Down

0 comments on commit c57264c

Please sign in to comment.