Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rft: add Cluster to ClientBuilder #142

Merged
merged 16 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions dubbo-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ pub fn generate<T: Service>(
}
}

pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self {
self.inner = self.inner.with_cluster(invoker);
self
}

#methods

}
Expand Down
4 changes: 2 additions & 2 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = "https:/apache/dubbo-rust.git"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hyper = { version = "0.14.19", features = ["full"] }
hyper = { version = "0.14.26", features = ["full"] }
http = "0.2"
tower-service.workspace = true
http-body = "0.4.4"
Expand All @@ -33,7 +33,7 @@ futures.workspace = true
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
aws-smithy-http = "0.54.1"
aws-smithy-http = "0.55.2"
dyn-clone = "1.0.11"
itertools.workspace = true
urlencoding.workspace = true
Expand Down
69 changes: 19 additions & 50 deletions dubbo/src/cluster/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,21 @@ use std::{
};

use crate::{
codegen::TripleInvoker,
invocation::{Invocation, RpcInvocation},
registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper},
protocol::BoxInvoker,
registry::{memory_registry::MemoryNotifyListener, BoxRegistry},
};
use dubbo_base::Url;
use dubbo_logger::tracing;

use crate::cluster::Directory;

/// Directory.
///
/// [Directory Service](http://en.wikipedia.org/wiki/Directory_service)
pub trait Directory: Debug + DirectoryClone {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url>;
}

pub trait DirectoryClone {
fn clone_box(&self) -> Box<dyn Directory>;
}

impl<T> DirectoryClone for T
where
T: 'static + Directory + Clone,
{
fn clone_box(&self) -> Box<dyn Directory> {
Box::new(self.clone())
}
}

impl Clone for Box<dyn Directory> {
fn clone(&self) -> Box<dyn Directory> {
self.clone_box()
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StaticDirectory {
uri: http::Uri,
}
Expand All @@ -78,51 +60,36 @@ impl StaticDirectory {
}

impl Directory for StaticDirectory {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let url = Url::from_url(&format!(
"tri://{}:{}/{}",
self.uri.host().unwrap(),
self.uri.port().unwrap(),
invocation.get_target_service_unique_name(),
))
.unwrap();
vec![url]
}
}

impl DirectoryClone for StaticDirectory {
fn clone_box(&self) -> Box<dyn Directory> {
Box::new(StaticDirectory {
uri: self.uri.clone(),
})
let invoker = Box::new(TripleInvoker::new(url));
vec![invoker]
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RegistryDirectory {
registry: RegistryWrapper,
registry: Arc<BoxRegistry>,
service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
}

impl RegistryDirectory {
pub fn new(registry: BoxRegistry) -> RegistryDirectory {
RegistryDirectory {
registry: RegistryWrapper {
registry: Some(registry),
},
registry: Arc::new(registry),
service_instances: Arc::new(RwLock::new(HashMap::new())),
}
}
}

impl DirectoryClone for RegistryDirectory {
fn clone_box(&self) -> Box<dyn Directory> {
todo!()
}
}

impl Directory for RegistryDirectory {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<Url> {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let service_name = invocation.get_target_service_unique_name();

let url = Url::from_url(&format!(
Expand All @@ -132,9 +99,6 @@ impl Directory for RegistryDirectory {
.unwrap();

self.registry
.registry
.as_ref()
.expect("msg")
.subscribe(
url,
Arc::new(MemoryNotifyListener {
Expand All @@ -149,6 +113,11 @@ impl Directory for RegistryDirectory {
.expect("service_instances.read");
let binding = Vec::new();
let url_vec = map.get(&service_name).unwrap_or(&binding);
url_vec.to_vec()
// url_vec.to_vec()
let mut invokers: Vec<BoxInvoker> = vec![];
for item in url_vec.iter() {
invokers.push(Box::new(TripleInvoker::new(item.clone())));
}
invokers
}
}
31 changes: 16 additions & 15 deletions dubbo/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll};

use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
use dyn_clone::DynClone;

use crate::{
empty_body,
Expand All @@ -28,13 +29,14 @@ use crate::{

pub mod directory;
pub mod loadbalance;
pub mod support;

pub trait Directory: Debug {
pub trait Directory: Debug + DynClone {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
fn is_empty(&self) -> bool;
// fn is_empty(&self) -> bool;
}

dyn_clone::clone_trait_object!(Directory);

type BoxDirectory = Box<dyn Directory + Send + Sync>;

pub trait Cluster {
Expand Down Expand Up @@ -76,21 +78,20 @@ impl Invoker<http::Request<SdkBody>> for FailoverCluster {
}

fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
println!("req: {}", req.body().content_length().unwrap());
let clone_body = req.body().try_clone().unwrap();
let mut clone_req = http::Request::builder()
.uri(req.uri().clone())
.method(req.method().clone());
*clone_req.headers_mut().unwrap() = req.headers().clone();
let r = clone_req.body(clone_body).unwrap();
// let clone_body = req.body().try_clone().unwrap();
// let mut clone_req = http::Request::builder()
// .uri(req.uri().clone())
// .method(req.method().clone());
// *clone_req.headers_mut().unwrap() = req.headers().clone();
// let r = clone_req.body(clone_body).unwrap();
let invokers = self.dir.list(
RpcInvocation::default()
.with_service_unique_name("hello".to_string())
.into(),
);
for mut invoker in invokers {
let fut = async move {
let res = invoker.call(r).await;
let res = invoker.call(req).await;
return res;
};
return Box::pin(fut);
Expand All @@ -110,7 +111,7 @@ impl Invoker<http::Request<SdkBody>> for FailoverCluster {
}
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct MockDirectory {
// router_chain: RouterChain,
invokers: Vec<BoxInvoker>,
Expand All @@ -134,9 +135,9 @@ impl Directory for MockDirectory {
self.invokers.clone()
}

fn is_empty(&self) -> bool {
false
}
// fn is_empty(&self) -> bool {
// false
// }
}

#[derive(Debug, Default)]
Expand Down
147 changes: 0 additions & 147 deletions dubbo/src/cluster/support/cluster_invoker.rs

This file was deleted.

Loading