Skip to content

Commit

Permalink
Unify client::Context and server::Context, removing client::Context.c…
Browse files Browse the repository at this point in the history
…lient_addr in the process
  • Loading branch information
tikue committed Aug 31, 2018
1 parent 984c20c commit 4374d5a
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 206 deletions.
3 changes: 2 additions & 1 deletion bincode-transport/tests/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use futures::compat::TokioDefaultSpawner;
use futures::prelude::*;
use humantime::{format_duration, FormattedDuration};
use rpc::{
context,
client::{self, Client},
server::{self, Handler, Server},
};
Expand Down Expand Up @@ -45,7 +46,7 @@ async fn bench() -> io::Result<()> {
let mut durations = vec![];
for _ in 1..=total {
let now = Instant::now();
let response = await!(client.send(client::Context::current(), 0u32));
let response = await!(client.send(context::current(), 0u32));
let elapsed = now.elapsed();

match response {
Expand Down
78 changes: 44 additions & 34 deletions bincode-transport/tests/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use futures::{prelude::*, stream};
use humantime::format_duration;
use rand::distributions::{Distribution, Normal};
use rpc::{
context,
client::{self, Client},
server::{self, Handler, Server},
server::{self, Server},
};
use std::{
io,
Expand All @@ -38,31 +39,35 @@ async fn run() -> io::Result<()> {
let server = Server::<String, String>::new(server::Config::default())
.incoming(listener)
.take(1)
.respond_with(|ctx, request| {
let client_addr = ctx.client_addr;

// Sleep for a time sampled from a normal distribution with:
// - mean: 1/2 the deadline.
// - std dev: 1/2 the deadline.
let deadline: Duration = ctx.deadline.as_duration();
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
let distribution =
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
let delay = Duration::from_millis(delay_millis as u64);

trace!(
"[{}/{}] Responding to request in {}.",
ctx.trace_id(),
client_addr,
format_duration(delay),
);

let wait = Delay::new(Instant::now() + delay).compat();
async move {
await!(wait).unwrap();
Ok(request)
}
.for_each(async move |channel| {
let channel = if let Ok(channel) = channel { channel } else { return };
let client_addr = *channel.client_addr();
let handler = channel.respond_with(move |ctx, request| {

// Sleep for a time sampled from a normal distribution with:
// - mean: 1/2 the deadline.
// - std dev: 1/2 the deadline.
let deadline: Duration = ctx.deadline.as_duration();
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
let distribution =
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
let delay = Duration::from_millis(delay_millis as u64);

trace!(
"[{}/{}] Responding to request in {}.",
ctx.trace_id(),
client_addr,
format_duration(delay),
);

let wait = Delay::new(Instant::now() + delay).compat();
async move {
await!(wait).unwrap();
Ok(request)
}
});
spawn!(handler);
});

spawn!(server).map_err(|e| {
Expand All @@ -84,13 +89,18 @@ async fn run() -> io::Result<()> {
let proxy_server = Server::<String, String>::new(server::Config::default())
.incoming(listener)
.take(1)
.respond_with(move |ctx, request| {
trace!("[{}/{}] Proxying request.", ctx.trace_id(), ctx.client_addr);

let ctx = ctx.into();
let mut client = client.clone();

async move { await!(client.send(ctx, request)) }
.for_each(move |channel| {
let client = client.clone();
async move {
let channel = if let Ok(channel) = channel { channel } else { return };
let client_addr = *channel.client_addr();
let handler = channel.respond_with(move |ctx, request| {
trace!("[{}/{}] Proxying request.", ctx.trace_id(), client_addr);
let mut client = client.clone();
async move { await!(client.send(ctx, request)) }
});
spawn!(handler);
}
});

spawn!(proxy_server).map_err(|e| {
Expand All @@ -113,7 +123,7 @@ async fn run() -> io::Result<()> {
let mut clients: Vec<_> = (1..=3u32).map(|_| client.clone()).collect();
let mut requests = vec![];
for client in &mut clients {
let mut ctx = client::Context::current();
let mut ctx = context::current();
ctx.deadline = SystemTime::now() + Duration::from_millis(200);
let trace_id = *ctx.trace_id();
let response = client.send(ctx, "ping");
Expand Down
147 changes: 77 additions & 70 deletions bincode-transport/tests/pushback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ extern crate log;
#[macro_use]
extern crate futures;

use futures::compat::TokioDefaultSpawner;
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::compat::{Future01CompatExt, TokioDefaultSpawner};
use futures::prelude::*;
use humantime::format_duration;
use rand::distributions::{Distribution, Normal};
use rpc::{
context,
client::{self, Client},
server::{self, Handler, Server},
server::{self, Server},
};
use std::{
io,
time::{Duration, Instant, SystemTime},
};
use tokio::{
net::{TcpListener, TcpStream},
timer::Delay,
};

Expand All @@ -36,81 +35,89 @@ impl AsDuration for SystemTime {
}
}

#[test]
fn ping_pong() -> Result<(), io::Error> {
env_logger::init();

let listener = TcpListener::bind(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr()?;
async fn run() -> io::Result<()> {
let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?;
let addr = listener.local_addr();
let server = Server::<String, String>::new(server::Config::default())
.incoming(
listener
.incoming()
.compat()
.take(1)
.map_ok(bincode_transport::new),
).respond_with(|ctx, request| {
let client_addr = ctx.client_addr;

// Sleep for a time sampled from a normal distribution with:
// - mean: 1/2 the deadline.
// - std dev: 1/2 the deadline.
let deadline: Duration = ctx.deadline.as_duration();
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
let distribution =
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
let delay = Duration::from_millis(delay_millis as u64);

trace!(
"[{}/{}] Responding to request in {}.",
ctx.trace_id(),
client_addr,
format_duration(delay),
);

let sleep = Delay::new(Instant::now() + delay).compat();
async {
await!(sleep).unwrap();
Ok(request)
.incoming(listener)
.take(1)
.for_each(async move |channel| {
let channel = if let Ok(channel) = channel { channel } else { return };
let client_addr = *channel.client_addr();
let handler = channel.respond_with(move |ctx, request| {

// Sleep for a time sampled from a normal distribution with:
// - mean: 1/2 the deadline.
// - std dev: 1/2 the deadline.
let deadline: Duration = ctx.deadline.as_duration();
let deadline_millis = deadline.as_secs() * 1000 + deadline.subsec_millis() as u64;
let distribution =
Normal::new(deadline_millis as f64 / 2., deadline_millis as f64 / 2.);
let delay_millis = distribution.sample(&mut rand::thread_rng()).max(0.);
let delay = Duration::from_millis(delay_millis as u64);

trace!(
"[{}/{}] Responding to request in {}.",
ctx.trace_id(),
client_addr,
format_duration(delay),
);

let sleep = Delay::new(Instant::now() + delay).compat();
async {
await!(sleep).unwrap();
Ok(request)
}
});
if let Err(e) = spawn!(handler) {
warn!("Couldn't spawn request handler: {:?}", e);
}
});

let client = async move {
let mut config = client::Config::default();
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;

let client = await!(Client::<String, String>::new(
config,
bincode_transport::new(await!(TcpStream::connect(&addr).compat())?)
));

let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
let ctx = client::Context::current();

spawn!(
async move {
let trace_id = *ctx.trace_id();
let response = client.send(ctx, "ping");
match await!(response) {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}
spawn!(server).map_err(|e|
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't spawn server: {:?}", e),
))?;

let mut config = client::Config::default();
config.max_in_flight_requests = 10;
config.pending_request_buffer = 10;

let conn = await!(bincode_transport::connect(&addr))?;
let client = await!(Client::<String, String>::new(config, conn));

let clients = (1..=100u32).map(|_| client.clone()).collect::<Vec<_>>();
for mut client in clients {
let ctx = context::current();
spawn!(
async move {
let trace_id = *ctx.trace_id();
let response = client.send(ctx, "ping");
match await!(response) {
Ok(response) => info!("[{}] response: {}", trace_id, response),
Err(e) => error!("[{}] request error: {:?}: {}", trace_id, e.kind(), e),
}
);
}
}
).map_err(|e|
io::Error::new(
io::ErrorKind::Other,
format!("Couldn't spawn server: {:?}", e),
))?;
}

Ok::<_, io::Error>(())
};
Ok(())
}

#[test]
fn ping_pong() -> io::Result<()> {
env_logger::init();

tokio::run(
server
.join(client)
.map(|_| println!("done"))
run()
.map_ok(|_| println!("done"))
.map_err(|e| panic!(e.to_string()))
.boxed()
.unit_error()
.compat(TokioDefaultSpawner),
);

Expand Down
30 changes: 0 additions & 30 deletions rpc/src/client/context.rs

This file was deleted.

9 changes: 5 additions & 4 deletions rpc/src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ impl Stream for CanceledRequests {
#[cfg(test)]
mod tests {
use super::{CanceledRequests, Channel, RequestCancellation, RequestDispatch};
use crate::client::{self, Config};
use crate::context;
use crate::client::Config;
use crate::transport::{self, channel::UnboundedChannel};
use crate::ClientMessage;
use crate::Response;
Expand All @@ -500,7 +501,7 @@ mod tests {
// Test that a request future dropped before it's processed by dispatch will cause the request
// to not be added to the in-flight request map.
let _resp = futures::executor::block_on(channel.start_send(
client::Context::current(),
context::current(),
0,
"hi".to_string(),
)).unwrap();
Expand All @@ -525,7 +526,7 @@ mod tests {
// Test that a request future dropped before it's processed by dispatch will cause the request
// to not be added to the in-flight request map.
let resp = futures::executor::block_on(channel.start_send(
client::Context::current(),
context::current(),
0,
"hi".into(),
)).unwrap();
Expand All @@ -550,7 +551,7 @@ mod tests {
// i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request
// map.
let resp = futures::executor::block_on(channel.start_send(
client::Context::current(),
context::current(),
1,
"hi".into(),
)).unwrap();
Expand Down
3 changes: 1 addition & 2 deletions rpc/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ use std::{
time::Instant,
};

pub use self::context::Context;
use crate::context::Context;
use crate::ClientMessage;

mod context;
mod dispatch;

/// Sends multiplexed requests to, and receives responses from, a server.
Expand Down
1 change: 1 addition & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern crate log;
#[macro_use]
extern crate serde;

pub mod context;
pub mod client;
pub mod server;
pub mod transport;
Expand Down
Loading

0 comments on commit 4374d5a

Please sign in to comment.