Skip to content

Commit

Permalink
feat(transport): Dynamic load balancing (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
dawid-nowak authored May 15, 2020
1 parent 372da52 commit 85ae0a4
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 58 deletions.
10 changes: 9 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ path = "src/load_balance/client.rs"
name = "load-balance-server"
path = "src/load_balance/server.rs"

[[bin]]
name = "dynamic-load-balance-client"
path = "src/dynamic_load_balance/client.rs"

[[bin]]
name = "dynamic-load-balance-server"
path = "src/dynamic_load_balance/server.rs"

[[bin]]
name = "tls-client"
path = "src/tls/client.rs"
Expand Down Expand Up @@ -123,7 +131,7 @@ serde_json = "1.0"
rand = "0.7"
# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.2.0-alpha", features = ["tracing-log"] }
tracing-subscriber = { version = "0.2", features = ["tracing-log"] }
tracing-attributes = "0.1"
tracing-futures = "0.2"
# Required for wellknown types
Expand Down
16 changes: 15 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ $ cargo run --bin authentication-client
$ cargo run --bin authentication-server
```

## Load balance
## Load Balance

### Client

Expand All @@ -58,6 +58,20 @@ $ cargo run --bin load-balance-client
$ cargo run --bin load-balance-server
```

## Dyanmic Load Balance

### Client

```bash
$ cargo run --bin dynamic-load-balance-client
```

### Server

```bash
$ cargo run --bin dynamic-load-balance-server
```

## TLS (rustls)

### Client
Expand Down
81 changes: 81 additions & 0 deletions examples/src/dynamic_load_balance/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}

use pb::{echo_client::EchoClient, EchoRequest};
use tonic::transport::Channel;

use tonic::transport::Endpoint;

use std::sync::Arc;

use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use tokio::time::timeout;
use tower::discover::Change;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let e1 = Endpoint::from_static("http://[::1]:50051");
let e2 = Endpoint::from_static("http://[::1]:50052");

let (channel, mut rx) = Channel::balance_channel(10);
let mut client = EchoClient::new(channel);

let done = Arc::new(AtomicBool::new(false));
let demo_done = done.clone();
tokio::spawn(async move {
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Added first endpoint");
let change = Change::Insert("1", e1);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Added second endpoint");
let change = Change::Insert("2", e2);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Removed first endpoint");
let change = Change::Remove("1");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Removed second endpoint");
let change = Change::Remove("2");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Added third endpoint");
let e3 = Endpoint::from_static("http://[::1]:50051");
let change = Change::Insert("3", e3);
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
println!("Removed third endpoint");
let change = Change::Remove("3");
let res = rx.send(change).await;
println!("{:?}", res);
demo_done.swap(true, SeqCst);
});

while !done.load(SeqCst) {
tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await;
let request = tonic::Request::new(EchoRequest {
message: "hello".into(),
});

let rx = client.unary_echo(request);
if let Ok(resp) = timeout(tokio::time::Duration::from_secs(10), rx).await {
println!("RESPONSE={:?}", resp);
} else {
println!("did not receive value within 10 secs");
}
}

println!("... Bye");

Ok(())
}
82 changes: 82 additions & 0 deletions examples/src/dynamic_load_balance/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}

use futures::Stream;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::sync::mpsc;
use tonic::{transport::Server, Request, Response, Status, Streaming};

use pb::{EchoRequest, EchoResponse};

type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>;

#[derive(Debug)]
pub struct EchoServer {
addr: SocketAddr,
}

#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
async fn unary_echo(&self, request: Request<EchoRequest>) -> EchoResult<EchoResponse> {
let message = format!("{} (from {})", request.into_inner().message, self.addr);

Ok(Response::new(EchoResponse { message }))
}

type ServerStreamingEchoStream = ResponseStream;

async fn server_streaming_echo(
&self,
_: Request<EchoRequest>,
) -> EchoResult<Self::ServerStreamingEchoStream> {
Err(Status::unimplemented("not implemented"))
}

async fn client_streaming_echo(
&self,
_: Request<Streaming<EchoRequest>>,
) -> EchoResult<EchoResponse> {
Err(Status::unimplemented("not implemented"))
}

type BidirectionalStreamingEchoStream = ResponseStream;

async fn bidirectional_streaming_echo(
&self,
_: Request<Streaming<EchoRequest>>,
) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
Err(Status::unimplemented("not implemented"))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addrs = ["[::1]:50051", "[::1]:50052"];

let (tx, mut rx) = mpsc::unbounded_channel();

for addr in &addrs {
let addr = addr.parse()?;
let tx = tx.clone();

let server = EchoServer { addr };
let serve = Server::builder()
.add_service(pb::echo_server::EchoServer::new(server))
.serve(addr);

tokio::spawn(async move {
if let Err(e) = serve.await {
eprintln!("Error = {:?}", e);
}

tx.send(()).unwrap();
});
}

rx.recv().await;

Ok(())
}
37 changes: 25 additions & 12 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;

use super::service::{Connection, ServiceList};
use super::service::{Connection, DynamicServiceStream};
use crate::{body::BoxBody, client::GrpcService};
use bytes::Bytes;
use http::{
Expand All @@ -20,13 +20,18 @@ use hyper::client::connect::Connection as HyperConnection;
use std::{
fmt,
future::Future,
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::{channel, Sender},
};

use tower::{
buffer::{self, Buffer},
discover::Discover,
discover::{Change, Discover},
util::{BoxService, Either},
Service,
};
Expand Down Expand Up @@ -104,17 +109,25 @@ impl Channel {
/// This creates a [`Channel`] that will load balance accross all the
/// provided endpoints.
pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
let list = list.collect::<Vec<_>>();
let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
list.for_each(|endpoint| {
tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
.unwrap();
});

let buffer_size = list
.iter()
.next()
.and_then(|e| e.buffer_size)
.unwrap_or(DEFAULT_BUFFER_SIZE);

let discover = ServiceList::new(list);
channel
}

Self::balance(discover, buffer_size)
/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
where
K: Hash + Eq + Send + Clone + 'static,
{
let (tx, rx) = channel(capacity);
let list = DynamicServiceStream::new(rx);
(Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
}

pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
Expand Down
Loading

0 comments on commit 85ae0a4

Please sign in to comment.