diff --git a/msg-transport/src/lib.rs b/msg-transport/src/lib.rs index 254a356..db7d7b7 100644 --- a/msg-transport/src/lib.rs +++ b/msg-transport/src/lib.rs @@ -11,42 +11,6 @@ pub mod durable; pub mod quic; pub mod tcp; -#[async_trait::async_trait] -pub trait ClientTransport { - type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static; - type Error: std::error::Error + Send + Sync + 'static; - type ConnectOptions: Default + Clone + Send + Sync + 'static; - - async fn connect_with_options( - addr: SocketAddr, - options: Self::ConnectOptions, - ) -> Result; -} - -#[async_trait::async_trait] -pub trait ServerTransport: Unpin + Send + Sync + 'static { - type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static; - type Error: std::error::Error + Send + Sync + 'static; - type BindOptions: Default + Send + Sync + 'static; - - async fn bind_with_options( - addr: SocketAddr, - options: Self::BindOptions, - ) -> Result - where - Self: Sized; - - fn local_addr(&self) -> Result; - - async fn accept(&self) -> Result<(Self::Io, SocketAddr), Self::Error>; - - #[allow(clippy::type_complexity)] - fn poll_accept( - &self, - cx: &mut Context<'_>, - ) -> Poll>; -} - /// A transport provides connection-oriented communication between two peers through /// ordered and reliable streams of bytes. /// diff --git a/msg/benches/pubsub.rs b/msg/benches/pubsub.rs index 7b09ecb..cae64ce 100644 --- a/msg/benches/pubsub.rs +++ b/msg/benches/pubsub.rs @@ -10,7 +10,7 @@ use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use msg_socket::{PubOptions, PubSocket, SubOptions, SubSocket}; -use msg_transport::{Tcp, TcpConnectOptions}; +use msg_transport::tcp::{self, Tcp}; const N_REQS: usize = 10_000; const MSG_SIZE: usize = 512; @@ -153,6 +153,7 @@ fn pubsub_single_thread_tcp(c: &mut Criterion) { let buffer_size = 1024 * 64; let publisher = PubSocket::with_options( + Tcp::default(), PubOptions::default() .flush_interval(Duration::from_micros(100)) .backpressure_boundary(buffer_size) @@ -160,6 +161,7 @@ fn pubsub_single_thread_tcp(c: &mut Criterion) { ); let subscriber = SubSocket::with_options( + Tcp::default(), SubOptions::default() .read_buffer_size(buffer_size) .ingress_buffer_size(N_REQS * 2), @@ -196,6 +198,7 @@ fn pubsub_multi_thread_tcp(c: &mut Criterion) { let buffer_size = 1024 * 64; let publisher = PubSocket::with_options( + Tcp::default(), PubOptions::default() .flush_interval(Duration::from_micros(100)) .backpressure_boundary(buffer_size) @@ -203,10 +206,10 @@ fn pubsub_multi_thread_tcp(c: &mut Criterion) { ); let subscriber = SubSocket::with_options( + Tcp::new(tcp::Config::default().blocking_connect(true)), SubOptions::default() .read_buffer_size(buffer_size) - .ingress_buffer_size(N_REQS * 2) - .connect_options(TcpConnectOptions::default().blocking_connect()), + .ingress_buffer_size(N_REQS * 2), ); let mut bench = PairBenchmark { diff --git a/msg/benches/reqrep.rs b/msg/benches/reqrep.rs index 182aef4..58383af 100644 --- a/msg/benches/reqrep.rs +++ b/msg/benches/reqrep.rs @@ -10,7 +10,7 @@ use pprof::criterion::Output; use rand::Rng; use msg_socket::{RepSocket, ReqOptions, ReqSocket}; -use msg_transport::Tcp; +use msg_transport::tcp::{self, Tcp}; use tokio::runtime::Runtime; const N_REQS: usize = 10_000; @@ -118,10 +118,12 @@ fn reqrep_single_thread_tcp(c: &mut Criterion) { .build() .unwrap(); - let req = - ReqSocket::with_options(ReqOptions::default().flush_interval(Duration::from_micros(50))); + let req = ReqSocket::with_options( + Tcp::new(tcp::Config::default().blocking_connect(true)), + ReqOptions::default().flush_interval(Duration::from_micros(50)), + ); - let rep = RepSocket::::new(); + let rep = RepSocket::new(Tcp::default()); let mut bench = PairBenchmark { rt, @@ -150,10 +152,12 @@ fn reqrep_multi_thread_tcp(c: &mut Criterion) { .build() .unwrap(); - let req = - ReqSocket::with_options(ReqOptions::default().flush_interval(Duration::from_micros(50))); + let req = ReqSocket::with_options( + Tcp::default(), + ReqOptions::default().flush_interval(Duration::from_micros(50)), + ); - let rep = RepSocket::::new(); + let rep = RepSocket::new(Tcp::default()); let mut bench = PairBenchmark { rt, diff --git a/msg/examples/durable.rs b/msg/examples/durable.rs index f1b0a3e..c2c3e66 100644 --- a/msg/examples/durable.rs +++ b/msg/examples/durable.rs @@ -1,11 +1,11 @@ use std::time::Duration; use bytes::Bytes; -use msg_transport::TcpConnectOptions; +use msg_transport::tcp; use tokio::sync::oneshot; use tokio_stream::StreamExt; -use msg::{Authenticator, RepSocket, ReqOptions, ReqSocket, Tcp}; +use msg::{tcp::Tcp, Authenticator, RepSocket, ReqOptions, ReqSocket}; use tracing::Instrument; #[derive(Default)] @@ -23,9 +23,9 @@ impl Authenticator for Auth { async fn start_rep() { // Initialize the reply socket (server side) with a transport // and an authenticator. - let mut rep = RepSocket::::new().with_auth(Auth); + let mut rep = RepSocket::new(Tcp::default()).with_auth(Auth); while rep.bind("0.0.0.0:4444".parse().unwrap()).await.is_err() { - rep = RepSocket::::new().with_auth(Auth); + rep = RepSocket::new(Tcp::default()).with_auth(Auth); tracing::warn!("Failed to bind rep socket, retrying..."); tokio::time::sleep(Duration::from_secs(1)).await; } @@ -65,10 +65,9 @@ async fn main() { // Initialize the request socket (client side) with a transport // and an identifier. This will implicitly turn on client authentication. - let mut req = ReqSocket::::with_options( - ReqOptions::default() - .timeout(Duration::from_secs(4)) - .connect_options(TcpConnectOptions::default().auth_token(Bytes::from("client1"))), + let mut req = ReqSocket::with_options( + Tcp::new(tcp::Config::default().auth_token(Bytes::from("client1"))), + ReqOptions::default().timeout(Duration::from_secs(4)), ); let (tx, rx) = oneshot::channel(); diff --git a/msg/examples/pubsub.rs b/msg/examples/pubsub.rs index f597dab..2526f23 100644 --- a/msg/examples/pubsub.rs +++ b/msg/examples/pubsub.rs @@ -1,17 +1,20 @@ use bytes::Bytes; use futures::StreamExt; -use msg_transport::TcpConnectOptions; use std::time::Duration; use tokio::time::timeout; use tracing::Instrument; -use msg::{PubOptions, PubSocket, SubOptions, SubSocket, Tcp}; +use msg::{ + tcp::{self, Tcp}, + PubOptions, PubSocket, SubOptions, SubSocket, +}; #[tokio::main] async fn main() { let _ = tracing_subscriber::fmt::try_init(); // Configure the publisher socket with options - let mut pub_socket = PubSocket::::with_options( + let mut pub_socket = PubSocket::with_options( + Tcp::default(), PubOptions::default() .backpressure_boundary(8192) .session_buffer_size(1024) @@ -19,18 +22,16 @@ async fn main() { ); // Configure the subscribers with options - let mut sub1 = SubSocket::::with_options( + let mut sub1 = SubSocket::with_options( // TCP transport with blocking connect, usually connection happens in the background. - SubOptions::default() - .ingress_buffer_size(1024) - .connect_options(TcpConnectOptions::default().blocking_connect()), + Tcp::new(tcp::Config::default().blocking_connect(true)), + SubOptions::default().ingress_buffer_size(1024), ); - let mut sub2 = SubSocket::::with_options( + let mut sub2 = SubSocket::with_options( // TCP transport with blocking connect, usually connection happens in the background. - SubOptions::default() - .ingress_buffer_size(1024) - .connect_options(TcpConnectOptions::default().blocking_connect()), + Tcp::new(tcp::Config::default().blocking_connect(true)), + SubOptions::default().ingress_buffer_size(1024), ); tracing::info!("Setting up the sockets..."); diff --git a/msg/examples/pubsub_auth.rs b/msg/examples/pubsub_auth.rs index 8b520d4..0bdab33 100644 --- a/msg/examples/pubsub_auth.rs +++ b/msg/examples/pubsub_auth.rs @@ -1,11 +1,13 @@ use bytes::Bytes; use futures::StreamExt; -use msg_transport::TcpConnectOptions; use std::time::Duration; use tokio::time::timeout; use tracing::Instrument; -use msg::{Authenticator, PubSocket, SubOptions, SubSocket, Tcp}; +use msg::{ + tcp::{self, Tcp}, + Authenticator, PubSocket, SubSocket, +}; #[derive(Default)] struct Auth; @@ -27,29 +29,26 @@ impl Authenticator for Auth { async fn main() { let _ = tracing_subscriber::fmt::try_init(); // Configure the publisher socket with options - let mut pub_socket = PubSocket::::new() + let mut pub_socket = PubSocket::new(Tcp::default()) // Enable the authenticator .with_auth(Auth); // Configure the subscribers with options - let mut sub1 = SubSocket::::with_options( + let mut sub1 = SubSocket::new( // TCP transport with blocking connect, usually connection happens in the background. // Set the auth token - SubOptions::default().connect_options( - TcpConnectOptions::default() + Tcp::new( + tcp::Config::default() .auth_token(Bytes::from("client1")) - .blocking_connect(), + .blocking_connect(true), ), ); - let mut sub2 = SubSocket::::with_options( - // Set the auth token - SubOptions::default().connect_options( - TcpConnectOptions::default() - .auth_token(Bytes::from("client2")) - .blocking_connect(), - ), - ); + let mut sub2 = SubSocket::new(Tcp::new( + tcp::Config::default() + .auth_token(Bytes::from("client2")) + .blocking_connect(true), + )); tracing::info!("Setting up the sockets..."); pub_socket diff --git a/msg/examples/pubsub_compression.rs b/msg/examples/pubsub_compression.rs index eef99ad..72cdedb 100644 --- a/msg/examples/pubsub_compression.rs +++ b/msg/examples/pubsub_compression.rs @@ -1,37 +1,37 @@ use bytes::Bytes; -use futures::StreamExt; -use msg_socket::SubOptions; -use msg_transport::TcpConnectOptions; use std::time::Duration; use tokio::time::timeout; +use tokio_stream::StreamExt; use tracing::Instrument; use msg::{ compression::{GzipCompressor, GzipDecompressor}, - PubSocket, SubSocket, Tcp, + tcp::{self, Tcp}, + PubSocket, SubSocket, }; #[tokio::main] async fn main() { let _ = tracing_subscriber::fmt::try_init(); // Configure the publisher socket with options - let mut pub_socket = PubSocket::::new() + let mut pub_socket = PubSocket::new(Tcp::default()) // Enable Gzip compression (compression level 6) .with_compressor(GzipCompressor::new(6)); // Configure the subscribers with options - let mut sub1 = SubSocket::::with_options( + let mut sub1 = SubSocket::new( // TCP transport with blocking connect, usually connection happens in the background. - SubOptions::default().connect_options(TcpConnectOptions::default().blocking_connect()), + Tcp::new(tcp::Config::default().blocking_connect(true)), ) - // Enable Gzip decompression (at the same level) + // Enable Gzip decompression .with_decompressor(GzipDecompressor::new()); - let mut sub2 = SubSocket::::with_options( + // Configure the subscribers with options + let mut sub2 = SubSocket::new( // TCP transport with blocking connect, usually connection happens in the background. - SubOptions::default().connect_options(TcpConnectOptions::default().blocking_connect()), + Tcp::new(tcp::Config::default().blocking_connect(true)), ) - // Enable Gzip decompression (at the same level) + // Enable Gzip decompression .with_decompressor(GzipDecompressor::new()); tracing::info!("Setting up the sockets..."); diff --git a/msg/examples/reqrep.rs b/msg/examples/reqrep.rs index 72f4330..60167f0 100644 --- a/msg/examples/reqrep.rs +++ b/msg/examples/reqrep.rs @@ -1,16 +1,16 @@ use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, Tcp}; +use msg::{tcp::Tcp, RepSocket, ReqSocket}; #[tokio::main] async fn main() { // Initialize the reply socket (server side) with a transport - let mut rep = RepSocket::::new(); + let mut rep = RepSocket::new(Tcp::default()); rep.bind("0.0.0.0:4444".parse().unwrap()).await.unwrap(); // Initialize the request socket (client side) with a transport - let mut req = ReqSocket::::new(); + let mut req = ReqSocket::new(Tcp::default()); req.connect("0.0.0.0:4444".parse().unwrap()).await.unwrap(); tokio::spawn(async move { diff --git a/msg/examples/reqrep_auth.rs b/msg/examples/reqrep_auth.rs index 82ff0fe..792db1b 100644 --- a/msg/examples/reqrep_auth.rs +++ b/msg/examples/reqrep_auth.rs @@ -1,8 +1,10 @@ use bytes::Bytes; -use msg_transport::TcpConnectOptions; use tokio_stream::StreamExt; -use msg::{Authenticator, RepSocket, ReqOptions, ReqSocket, Tcp}; +use msg::{ + tcp::{self, Tcp}, + Authenticator, RepSocket, ReqSocket, +}; #[derive(Default)] struct Auth; @@ -19,15 +21,14 @@ impl Authenticator for Auth { async fn main() { // Initialize the reply socket (server side) with a transport // and an authenticator. - let mut rep = RepSocket::::new().with_auth(Auth); + let mut rep = RepSocket::new(Tcp::default()).with_auth(Auth); rep.bind("0.0.0.0:4444".parse().unwrap()).await.unwrap(); // Initialize the request socket (client side) with a transport // and an identifier. This will implicitly turn on client authentication. - let mut req = ReqSocket::::with_options( - ReqOptions::default() - .connect_options(TcpConnectOptions::default().auth_token(Bytes::from("client1"))), - ); + let mut req = ReqSocket::new(Tcp::new( + tcp::Config::default().auth_token(Bytes::from("client1")), + )); req.connect("0.0.0.0:4444".parse().unwrap()).await.unwrap();