diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 1e1c9fbd..bfcfe45b 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -90,11 +90,6 @@ pub fn generate( } } - pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self { - self.inner = self.inner.with_cluster(invoker); - self - } - #methods } diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 91b19d6f..d0ab2a4d 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/apache/dubbo-rust.git" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -hyper = { version = "0.14.19", features = ["full"] } +hyper = { version = "0.14.26", features = ["full"] } http = "0.2" tower-service.workspace = true http-body = "0.4.4" @@ -33,7 +33,7 @@ futures.workspace = true axum = "0.5.9" async-stream = "0.3" flate2 = "1.0" -aws-smithy-http = "0.54.1" +aws-smithy-http = "0.55.2" dyn-clone = "1.0.11" itertools.workspace = true urlencoding.workspace = true diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index e74fc6d4..afe9657b 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -23,39 +23,21 @@ use std::{ }; use crate::{ + codegen::TripleInvoker, invocation::{Invocation, RpcInvocation}, - registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper}, + protocol::BoxInvoker, + registry::{memory_registry::MemoryNotifyListener, BoxRegistry}, }; use dubbo_base::Url; use dubbo_logger::tracing; +use crate::cluster::Directory; + /// Directory. /// /// [Directory Service](http://en.wikipedia.org/wiki/Directory_service) -pub trait Directory: Debug + DirectoryClone { - fn list(&self, invocation: Arc) -> Vec; -} -pub trait DirectoryClone { - fn clone_box(&self) -> Box; -} - -impl DirectoryClone for T -where - T: 'static + Directory + Clone, -{ - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } -} - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StaticDirectory { uri: http::Uri, } @@ -78,7 +60,7 @@ impl StaticDirectory { } impl Directory for StaticDirectory { - fn list(&self, invocation: Arc) -> Vec { + fn list(&self, invocation: Arc) -> Vec { let url = Url::from_url(&format!( "tri://{}:{}/{}", self.uri.host().unwrap(), @@ -86,43 +68,28 @@ impl Directory for StaticDirectory { invocation.get_target_service_unique_name(), )) .unwrap(); - vec![url] - } -} - -impl DirectoryClone for StaticDirectory { - fn clone_box(&self) -> Box { - Box::new(StaticDirectory { - uri: self.uri.clone(), - }) + let invoker = Box::new(TripleInvoker::new(url)); + vec![invoker] } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RegistryDirectory { - registry: RegistryWrapper, + registry: Arc, service_instances: Arc>>>, } impl RegistryDirectory { pub fn new(registry: BoxRegistry) -> RegistryDirectory { RegistryDirectory { - registry: RegistryWrapper { - registry: Some(registry), - }, + registry: Arc::new(registry), service_instances: Arc::new(RwLock::new(HashMap::new())), } } } -impl DirectoryClone for RegistryDirectory { - fn clone_box(&self) -> Box { - todo!() - } -} - impl Directory for RegistryDirectory { - fn list(&self, invocation: Arc) -> Vec { + fn list(&self, invocation: Arc) -> Vec { let service_name = invocation.get_target_service_unique_name(); let url = Url::from_url(&format!( @@ -132,9 +99,6 @@ impl Directory for RegistryDirectory { .unwrap(); self.registry - .registry - .as_ref() - .expect("msg") .subscribe( url, Arc::new(MemoryNotifyListener { @@ -149,6 +113,11 @@ impl Directory for RegistryDirectory { .expect("service_instances.read"); let binding = Vec::new(); let url_vec = map.get(&service_name).unwrap_or(&binding); - url_vec.to_vec() + // url_vec.to_vec() + let mut invokers: Vec = vec![]; + for item in url_vec.iter() { + invokers.push(Box::new(TripleInvoker::new(item.clone()))); + } + invokers } } diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index 4f73d2f5..d1f96f95 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll}; use aws_smithy_http::body::SdkBody; use dubbo_base::Url; +use dyn_clone::DynClone; use crate::{ empty_body, @@ -28,13 +29,14 @@ use crate::{ pub mod directory; pub mod loadbalance; -pub mod support; -pub trait Directory: Debug { +pub trait Directory: Debug + DynClone { fn list(&self, invocation: Arc) -> Vec; - fn is_empty(&self) -> bool; + // fn is_empty(&self) -> bool; } +dyn_clone::clone_trait_object!(Directory); + type BoxDirectory = Box; pub trait Cluster { @@ -76,13 +78,12 @@ impl Invoker> for FailoverCluster { } fn call(&mut self, req: http::Request) -> Self::Future { - println!("req: {}", req.body().content_length().unwrap()); - let clone_body = req.body().try_clone().unwrap(); - let mut clone_req = http::Request::builder() - .uri(req.uri().clone()) - .method(req.method().clone()); - *clone_req.headers_mut().unwrap() = req.headers().clone(); - let r = clone_req.body(clone_body).unwrap(); + // let clone_body = req.body().try_clone().unwrap(); + // let mut clone_req = http::Request::builder() + // .uri(req.uri().clone()) + // .method(req.method().clone()); + // *clone_req.headers_mut().unwrap() = req.headers().clone(); + // let r = clone_req.body(clone_body).unwrap(); let invokers = self.dir.list( RpcInvocation::default() .with_service_unique_name("hello".to_string()) @@ -90,7 +91,7 @@ impl Invoker> for FailoverCluster { ); for mut invoker in invokers { let fut = async move { - let res = invoker.call(r).await; + let res = invoker.call(req).await; return res; }; return Box::pin(fut); @@ -110,7 +111,7 @@ impl Invoker> for FailoverCluster { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct MockDirectory { // router_chain: RouterChain, invokers: Vec, @@ -134,9 +135,9 @@ impl Directory for MockDirectory { self.invokers.clone() } - fn is_empty(&self) -> bool { - false - } + // fn is_empty(&self) -> bool { + // false + // } } #[derive(Debug, Default)] diff --git a/dubbo/src/cluster/support/cluster_invoker.rs b/dubbo/src/cluster/support/cluster_invoker.rs deleted file mode 100644 index 0ccca488..00000000 --- a/dubbo/src/cluster/support/cluster_invoker.rs +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use aws_smithy_http::body::SdkBody; -use std::{str::FromStr, sync::Arc}; - -use dubbo_base::Url; -use http::{uri::PathAndQuery, Request}; - -use crate::{ - cluster::{ - loadbalance::{types::BoxLoadBalance, LOAD_BALANCE_EXTENSIONS}, - support::DEFAULT_LOADBALANCE, - }, - codegen::{Directory, RegistryDirectory, TripleClient}, - invocation::RpcInvocation, -}; - -#[derive(Debug, Clone)] -pub struct ClusterInvoker { - directory: Arc, - destroyed: bool, -} - -pub trait ClusterInvokerSelector { - /// Select a invoker using loadbalance policy. - fn select( - &self, - invocation: Arc, - invokers: Arc>, - excluded: Arc>, - ) -> Option; - - fn do_select( - &self, - loadbalance_key: Option<&str>, - invocation: Arc, - invokers: Arc>, - ) -> Option; -} - -pub trait ClusterRequestBuilder { - fn build_req( - &self, - triple_client: &mut TripleClient, - path: http::uri::PathAndQuery, - invocation: Arc, - body: SdkBody, - ) -> http::Request; -} - -impl ClusterInvoker { - pub fn with_directory(registry_directory: RegistryDirectory) -> Self { - ClusterInvoker { - directory: Arc::new(registry_directory), - destroyed: false, - } - } - - pub fn directory(&self) -> Arc { - self.directory.clone() - } - - pub fn init_loadbalance(&self, loadbalance_key: &str) -> &BoxLoadBalance { - if LOAD_BALANCE_EXTENSIONS.contains_key(loadbalance_key) { - LOAD_BALANCE_EXTENSIONS.get(loadbalance_key).unwrap() - } else { - println!( - "loadbalance {} not found, use default loadbalance {}", - loadbalance_key, DEFAULT_LOADBALANCE - ); - LOAD_BALANCE_EXTENSIONS.get(DEFAULT_LOADBALANCE).unwrap() - } - } - - pub fn is_available(&self, invocation: Arc) -> bool { - !self.destroyed() && !self.directory.list(invocation).is_empty() - } - - pub fn destroyed(&self) -> bool { - self.destroyed - } -} - -impl ClusterInvokerSelector for ClusterInvoker { - fn select( - &self, - invocation: Arc, - invokers: Arc>, - _excluded: Arc>, - ) -> Option { - if invokers.is_empty() { - return None; - } - let instance_count = invokers.len(); - return if instance_count == 1 { - Some(invokers.as_ref().first()?.clone()) - } else { - let loadbalance = Some(DEFAULT_LOADBALANCE); - self.do_select(loadbalance, invocation, invokers) - }; - } - - /// picking instance invoker url from registry directory - fn do_select( - &self, - loadbalance_key: Option<&str>, - invocation: Arc, - invokers: Arc>, - ) -> Option { - let loadbalance = self.init_loadbalance(loadbalance_key.unwrap_or(DEFAULT_LOADBALANCE)); - loadbalance.select(invokers, None, invocation) - } -} - -impl ClusterRequestBuilder for ClusterInvoker { - fn build_req( - &self, - triple_client: &mut TripleClient, - path: PathAndQuery, - invocation: Arc, - body: SdkBody, - ) -> Request { - let invokers = self.directory.list(invocation.clone()); - let invoker_url = self - .select(invocation, Arc::new(invokers), Arc::new(Vec::new())) - .expect("no valid provider"); - let http_uri = - http::Uri::from_str(&format!("http://{}:{}/", invoker_url.ip, invoker_url.port)) - .unwrap(); - triple_client.map_request(http_uri, path, body) - } -} diff --git a/dubbo/src/cluster/support/mod.rs b/dubbo/src/cluster/support/mod.rs deleted file mode 100644 index ae42cc28..00000000 --- a/dubbo/src/cluster/support/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -pub mod cluster_invoker; - -pub const DEFAULT_LOADBALANCE: &str = "random"; diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 98d784fc..412feb91 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -27,14 +27,11 @@ pub use hyper::Body as hyperBody; pub use tower_service::Service; pub use super::{ - cluster::{ - directory::{Directory, RegistryDirectory}, - support::cluster_invoker::ClusterInvoker, - }, + cluster::directory::RegistryDirectory, empty_body, invocation::{IntoStreamingRequest, Request, Response, RpcInvocation}, protocol::{triple::triple_invoker::TripleInvoker, Invoker}, - registry::{BoxRegistry, Registry, RegistryWrapper}, + registry::{BoxRegistry, Registry}, triple::{ client::TripleClient, codec::{prost::ProstCodec, Codec}, diff --git a/dubbo/src/registry/integration.rs b/dubbo/src/registry/integration.rs index 15b82d01..2944f981 100644 --- a/dubbo/src/registry/integration.rs +++ b/dubbo/src/registry/integration.rs @@ -14,10 +14,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use crate::{cluster::support::cluster_invoker::ClusterInvoker, registry::BoxRegistry}; -use std::sync::Arc; - -pub trait ClusterRegistryIntegration { - /// get cluster invoker struct - fn get_invoker(registry: BoxRegistry) -> Option>; -} diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index 31106f0f..2a95452a 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -60,20 +60,3 @@ impl Debug for BoxRegistry { f.write_str("BoxRegistry") } } - -#[derive(Default)] -pub struct RegistryWrapper { - pub registry: Option>, -} - -impl Clone for RegistryWrapper { - fn clone(&self) -> Self { - Self { registry: None } - } -} - -impl Debug for RegistryWrapper { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RegistryWrapper").finish() - } -} diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 29957a6f..06ecd627 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -15,9 +15,12 @@ * limitations under the License. */ +use std::sync::Arc; + use crate::{ - cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory}, - codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker}, + cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory}, + codegen::{RegistryDirectory, RpcInvocation, TripleInvoker}, + protocol::BoxInvoker, triple::compression::CompressionEncoding, utils::boxed_clone::BoxCloneService, }; @@ -35,7 +38,6 @@ pub struct ClientBuilder { pub timeout: Option, pub connector: &'static str, directory: Option>, - cluster_invoker: Option, pub direct: bool, host: String, } @@ -46,7 +48,6 @@ impl ClientBuilder { timeout: None, connector: "", directory: None, - cluster_invoker: None, direct: false, host: "".to_string(), } @@ -57,7 +58,6 @@ impl ClientBuilder { timeout: None, connector: "", directory: Some(Box::new(StaticDirectory::new(&host))), - cluster_invoker: None, direct: true, host: host.clone().to_string(), } @@ -74,15 +74,13 @@ impl ClientBuilder { pub fn with_directory(self, directory: Box) -> Self { Self { directory: Some(directory), - cluster_invoker: None, ..self } } pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self { Self { - directory: None, - cluster_invoker: Some(ClusterInvoker::with_directory(registry)), + directory: Some(Box::new(registry)), ..self } } @@ -97,7 +95,6 @@ impl ClientBuilder { pub fn with_connector(self, connector: &'static str) -> Self { Self { connector: connector, - cluster_invoker: None, ..self } } @@ -106,25 +103,31 @@ impl ClientBuilder { Self { direct, ..self } } - pub fn build(self) -> TripleClient { + pub(crate) fn direct_build(self) -> TripleClient { let mut cli = TripleClient { send_compression_encoding: Some(CompressionEncoding::Gzip), - directory: self.directory, - cluster_invoker: self.cluster_invoker, + builder: Some(self.clone()), invoker: None, }; + cli.invoker = Some(Box::new(TripleInvoker::new( + Url::from_url(&self.host).unwrap(), + ))); + return cli; + } + + pub fn build(self, invocation: Arc) -> Option { if self.direct { - cli.invoker = Some(Box::new(TripleInvoker::new( + return Some(Box::new(TripleInvoker::new( Url::from_url(&self.host).unwrap(), ))); - return cli; } + let invokers = match self.directory { + Some(v) => v.list(invocation), + None => panic!("use direct connection"), + }; - let cluster = MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new( - TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()), - )]))); + let cluster = MockCluster::default().join(Box::new(MockDirectory::new(invokers))); - cli.invoker = Some(cluster); - cli + return Some(cluster); } } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index eb7934dd..124cfcfd 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -15,20 +15,17 @@ * limitations under the License. */ -use std::{str::FromStr, sync::Arc}; +use std::str::FromStr; use futures_util::{future, stream, StreamExt, TryStreamExt}; use aws_smithy_http::body::SdkBody; use http::HeaderValue; -use rand::prelude::SliceRandom; -use tower_service::Service; -use super::{super::transport::connection::Connection, builder::ClientBuilder}; -use crate::codegen::{ClusterInvoker, Directory, RpcInvocation}; +use super::builder::ClientBuilder; +use crate::codegen::RpcInvocation; use crate::{ - cluster::support::cluster_invoker::ClusterRequestBuilder, invocation::{IntoStreamingRequest, Metadata, Request, Response}, protocol::BoxInvoker, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, @@ -37,8 +34,7 @@ use crate::{ #[derive(Debug, Clone, Default)] pub struct TripleClient { pub(crate) send_compression_encoding: Option, - pub(crate) directory: Option>, - pub(crate) cluster_invoker: Option, + pub(crate) builder: Option, pub invoker: Option, } @@ -46,17 +42,14 @@ impl TripleClient { pub fn connect(host: String) -> Self { let builder = ClientBuilder::from_static(&host).with_direct(true); - builder.build() + builder.direct_build() } pub fn new(builder: ClientBuilder) -> Self { - builder.build() - } - - pub fn with_cluster(self, invoker: ClusterInvoker) -> Self { TripleClient { - cluster_invoker: Some(invoker), - ..self + send_compression_encoding: Some(CompressionEncoding::Gzip), + builder: Some(builder), + invoker: None, } } @@ -137,7 +130,7 @@ impl TripleClient { req: Request, mut codec: C, path: http::uri::PathAndQuery, - _invocation: RpcInvocation, + invocation: RpcInvocation, ) -> Result, crate::status::Status> where C: Codec, @@ -155,8 +148,16 @@ impl TripleClient { let bytes = hyper::body::to_bytes(body).await.unwrap(); let sdk_body = SdkBody::from(bytes); - // let mut conn = Connection::new().with_host(http_uri); - let mut conn = self.invoker.clone().unwrap(); + let mut conn = match self.invoker.clone() { + Some(v) => v, + None => self + .builder + .clone() + .unwrap() + .build(invocation.into()) + .unwrap(), + }; + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); let req = self.map_request(http_uri.clone(), path, sdk_body); @@ -214,25 +215,20 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(en); let sdk_body = SdkBody::from(body); - let arc_invocation = Arc::new(invocation); - let req; - let http_uri; - if self.cluster_invoker.is_some() { - let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone(); - req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body); - http_uri = req.uri().clone(); - } else { - let url_list = self - .directory - .as_ref() - .expect("msg") - .list(arc_invocation.clone()); - let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); - http_uri = - http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); - req = self.map_request(http_uri.clone(), path, sdk_body); - } - let mut conn = Connection::new().with_host(http_uri); + + let mut conn = match self.invoker.clone() { + Some(v) => v, + None => self + .builder + .clone() + .unwrap() + .build(invocation.into()) + .unwrap(), + }; + + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); + let req = self.map_request(http_uri.clone(), path, sdk_body); + let response = conn .call(req) .await @@ -271,25 +267,21 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(en); let sdk_body = SdkBody::from(body); - let arc_invocation = Arc::new(invocation); - let req; - let http_uri; - if self.cluster_invoker.is_some() { - let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone(); - req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body); - http_uri = req.uri().clone(); - } else { - let url_list = self - .directory - .as_ref() - .expect("msg") - .list(arc_invocation.clone()); - let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); - http_uri = - http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); - req = self.map_request(http_uri.clone(), path, sdk_body); - } - let mut conn = Connection::new().with_host(http_uri); + + let mut conn = match self.invoker.clone() { + Some(v) => v, + None => self + .builder + .clone() + .unwrap() + .build(invocation.into()) + .unwrap(), + }; + + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); + let req = self.map_request(http_uri.clone(), path, sdk_body); + + // let mut conn = Connection::new().with_host(http_uri); let response = conn .call(req) .await @@ -344,26 +336,19 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(en); let sdk_body = SdkBody::from(body); - let arc_invocation = Arc::new(invocation); - let req; - let http_uri; - if self.cluster_invoker.is_some() { - let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone(); - req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body); - http_uri = req.uri().clone(); - } else { - let url_list = self - .directory - .as_ref() - .expect("msg") - .list(arc_invocation.clone()); - let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg"); - http_uri = - http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap(); - req = self.map_request(http_uri.clone(), path, sdk_body); - } - let mut conn = Connection::new().with_host(http_uri); + let mut conn = match self.invoker.clone() { + Some(v) => v, + None => self + .builder + .clone() + .unwrap() + .build(invocation.into()) + .unwrap(), + }; + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); + let req = self.map_request(http_uri.clone(), path, sdk_body); + let response = conn .call(req) .await diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml index 319c17b3..bc6638b8 100644 --- a/examples/echo/Cargo.toml +++ b/examples/echo/Cargo.toml @@ -30,9 +30,7 @@ async-trait = "0.1.56" tokio-stream = "0.1" dubbo-logger.workspace=true -hyper = { version = "0.14.19", features = ["full"]} - -dubbo = {path = "../../dubbo", version = "0.3.0" } +dubbo = {path = "../../dubbo"} dubbo-config = {path = "../../config", version = "0.3.0" } registry-zookeeper.workspace=true diff --git a/examples/echo/src/echo/client.rs b/examples/echo/src/echo/client.rs index db46958c..0a2f150b 100644 --- a/examples/echo/src/echo/client.rs +++ b/examples/echo/src/echo/client.rs @@ -34,7 +34,9 @@ async fn main() { // let builder = ClientBuilder::new() // .with_connector("unix") // .with_host("unix://127.0.0.1:8888"); - let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888").with_timeout(1000000); + let builder = ClientBuilder::from_static(&"http://127.0.0.1:8888") + .with_timeout(1000000) + .with_direct(true); let mut cli = EchoClient::new(builder); // let mut unary_cli = cli.clone().with_filter(FakeFilter {}); // let mut cli = EchoClient::build(ClientBuilder::from_static("http://127.0.0.1:8888")); diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml index 6ad3b1b5..d68cab7b 100644 --- a/examples/greeter/Cargo.toml +++ b/examples/greeter/Cargo.toml @@ -29,7 +29,7 @@ prost = "0.10.4" async-trait = "0.1.56" tokio-stream = "0.1" dubbo-logger = { path = "../../common/logger" } -dubbo = { path = "../../dubbo", version = "0.3.0" } +dubbo = { path = "../../dubbo"} dubbo-config = { path = "../../config", version = "0.3.0" } registry-zookeeper.workspace = true registry-nacos.workspace = true diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs index 5debc0ab..e8c2c5ce 100644 --- a/registry/zookeeper/src/lib.rs +++ b/registry/zookeeper/src/lib.rs @@ -34,11 +34,9 @@ use serde::{Deserialize, Serialize}; use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKeeper}; use dubbo::{ - cluster::support::cluster_invoker::ClusterInvoker, - codegen::BoxRegistry, registry::{ - integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener, - Registry, RegistryNotifyListener, ServiceEvent, + memory_registry::MemoryRegistry, NotifyListener, Registry, RegistryNotifyListener, + ServiceEvent, }, StdError, }; @@ -371,12 +369,6 @@ impl NotifyListener for ServiceInstancesChangedListener { } } -impl ClusterRegistryIntegration for ZookeeperRegistry { - fn get_invoker(registry: BoxRegistry) -> Option> { - todo!() - } -} - #[cfg(test)] mod tests { use std::sync::Arc;