Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grpc tcp connection limiter #3

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
be149b2
Update WRS to 0.8.0 and async_channel to 2.0.0
aspect Oct 28, 2023
33947b2
Add TCP connection limiter service to utils library
biryukovmaxim Oct 28, 2023
bffb7ac
Implement TCP connection limit on gRPC server
biryukovmaxim Oct 28, 2023
b38b4f1
Implement TCP connection limit on gRPC server
biryukovmaxim Oct 28, 2023
1a78c11
Refactor import statements and add http dependency
biryukovmaxim Oct 28, 2023
14a23db
Fix service shutdown in p2p and gRPC connection handlers
biryukovmaxim Oct 28, 2023
8211e8e
Update crate dependencies for tonic, tonic-build, and related modules
biryukovmaxim Oct 28, 2023
d9f7139
Remove gRPC-specific limiter, introduce shared TCP limiter
biryukovmaxim Oct 28, 2023
61ccfe7
"Refactor connection limiter to be protocol agnostic
biryukovmaxim Oct 28, 2023
5115685
"Update TCP Connection Limiter and Remove Redundant Tests
biryukovmaxim Oct 28, 2023
a4ec24c
"Add TCP connections limit in Kaspa
biryukovmaxim Oct 28, 2023
9d65d56
"Add option for maximum TCP connections
biryukovmaxim Oct 28, 2023
359a218
"Exclude tcp_limiter and some dependencies for wasm32 target
biryukovmaxim Oct 28, 2023
652aa72
"Refactor TCP limit calculation in kaspad/src/main.rs
biryukovmaxim Oct 29, 2023
0a42679
"Add todo comments for future TCP limit implementation
biryukovmaxim Oct 29, 2023
eccbebf
"Limit TCP connections in integration tests
biryukovmaxim Oct 29, 2023
6c04495
Merge branch 'master' into grpc-tcp-connection-limiter
biryukovmaxim Oct 30, 2023
f0ac8a4
Adjust file descriptor limit in integration tests
biryukovmaxim Oct 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 138 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ parking_lot = "0.12"
smallvec = { version = "1.10.0", features = ["serde"] }
borsh = { version = "0.9.1", features = ["rc"] } # please keep this fixed
async-std = { version = "1.12.0", features = ['attributes'] }
async-channel = "1.8.0"
async-channel = "2.0.0"
clap = { version = "4.0.23", features = ["derive", "string", "cargo"] }
derive_more = { version = "0.99" }
log = "0.4"
Expand Down Expand Up @@ -207,16 +207,16 @@ pbkdf2 = { version = "0.12.1" }


# workflow dependencies
workflow-d3 = { version = "0.7.0" }
workflow-nw = { version = "0.7.0" }
workflow-log = { version = "0.7.0" }
workflow-core = { version = "0.7.0" }
workflow-wasm = { version = "0.7.0" }
workflow-dom = { version = "0.7.0" }
workflow-rpc = { version = "0.7.0" }
workflow-node = { version = "0.7.0" }
workflow-store = { version = "0.7.0" }
workflow-terminal = { version = "0.7.0" }
workflow-d3 = { version = "0.8.0" }
workflow-nw = { version = "0.8.0" }
workflow-log = { version = "0.8.0" }
workflow-core = { version = "0.8.0" }
workflow-wasm = { version = "0.8.0" }
workflow-dom = { version = "0.8.0" }
workflow-rpc = { version = "0.8.0" }
workflow-node = { version = "0.8.0" }
workflow-store = { version = "0.8.0" }
workflow-terminal = { version = "0.8.0" }
nw-sys = "0.1.5"

# if below is enabled, this means that there is an ongoing work
Expand Down
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ kaspa-consensusmanager.workspace = true
once_cell.workspace = true

crossbeam-channel = "0.5"
async-channel = "1.8.0"
async-channel.workspace = true
secp256k1 = { version = "0.24", features = ["global-context", "rand-std"] }

rand = { workspace = true, features = ["small_rng"] }
Expand Down
13 changes: 12 additions & 1 deletion kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct Args {
pub prealloc_amount: u64,

pub disable_upnp: bool,

pub max_tcp_connections: Option<u16>,
}

impl Default for Args {
Expand Down Expand Up @@ -111,6 +113,7 @@ impl Default for Args {
prealloc_amount: 1_000_000,

disable_upnp: false,
max_tcp_connections: None,
}
}
}
Expand Down Expand Up @@ -316,7 +319,14 @@ pub fn cli() -> Command {
.require_equals(true)
.value_parser(clap::value_parser!(u64))
.help("Interval in seconds for performance metrics collection."),
)
).arg(
Arg::new("max-tcp-conns")
.long("max-tcp-conns")
.value_name("max-tcp-conns")
.require_equals(true)
.value_parser(clap::value_parser!(u16))
.help("Max number of tcp connections."),
)
.arg(arg!(--"disable-upnp" "Disable upnp"));

#[cfg(feature = "devnet-prealloc")]
Expand Down Expand Up @@ -377,6 +387,7 @@ pub fn parse_args() -> Args {
#[cfg(feature = "devnet-prealloc")]
prealloc_amount: m.get_one::<u64>("prealloc-amount").cloned().unwrap_or(defaults.prealloc_amount),
disable_upnp: m.get_one::<bool>("disable-upnp").cloned().unwrap_or(defaults.disable_upnp),
max_tcp_connections: m.get_one::<u16>("max-tcp-conns").cloned(),
}
}

Expand Down
19 changes: 18 additions & 1 deletion kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use kaspa_mining::{
use kaspa_p2p_flows::{flow_context::FlowContext, service::P2pService};

use kaspa_perf_monitor::builder::Builder as PerfMonitorBuilder;
use kaspa_utils::fd_budget::acquire_guard;
use kaspa_utils::tcp_limiter::Limit;
use kaspa_utxoindex::{api::UtxoIndexProxy, UtxoIndex};
use kaspa_wrpc_server::service::{Options as WrpcServerOptions, ServerCounters as WrpcServerCounters, WrpcEncoding, WrpcService};

Expand Down Expand Up @@ -177,6 +179,18 @@ pub fn create_core(args: Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreS
pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget: i32) -> (Arc<Core>, Arc<RpcCoreService>) {
let network = args.network();
let mut fd_remaining = fd_total_budget;
let tcp_limit = if let Some(tcp_limit) = args.max_tcp_connections {
tcp_limit as i32
} else {
args.rpc_max_clients as i32 - args.inbound_limit as i32 - args.outbound_target as i32
};
let Ok(_tcp_limit_guard) = acquire_guard(tcp_limit) else {
println!("Oops! Looks like we've hit a system limit. You have a couple of options:");
println!("1. Increase the file limit for this process. If you're on Linux, you can do this with the 'ulimit' command.");
println!("2. Reduce the TCP connection limit by passing the `max-tcp-conns` argument when starting this program.");
exit(1);
};
fd_remaining -= tcp_limit;
let utxo_files_limit = if args.utxoindex {
let utxo_files_limit = fd_remaining * 10 / 100;
fd_remaining -= utxo_files_limit;
Expand All @@ -190,6 +204,7 @@ pub fn create_core_with_runtime(runtime: &Runtime, args: &Args, fd_total_budget:
exit(1);
}

let tcp_limit = Some(Arc::new(Limit::new(tcp_limit)));
let config = Arc::new(
ConfigBuilder::new(network.into())
.adjust_perf_params_to_consensus_params()
Expand Down Expand Up @@ -368,6 +383,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
args.inbound_limit,
dns_seeders,
config.default_p2p_port(),
tcp_limit.clone(),
));

let rpc_core_service = Arc::new(RpcCoreService::new(
Expand All @@ -384,7 +400,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
wrpc_json_counters.clone(),
perf_monitor.clone(),
));
let grpc_service = Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients));
let grpc_service = Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients, tcp_limit.clone()));

// Create an async runtime and register the top-level async services
let async_runtime = Arc::new(AsyncRuntime::new(args.async_threads));
Expand Down Expand Up @@ -419,6 +435,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
WrpcServerOptions {
listen_address: listen_address.to_address(&network.network_type, &encoding).to_string(), // TODO: use a normalized ContextualNetAddress instead of a String
verbose: args.wrpc_verbose,
tcp_limit: tcp_limit.clone(),
..WrpcServerOptions::default()
},
))
Expand Down
4 changes: 2 additions & 2 deletions kaspad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub fn main() {
let _profiler = dhat::Profiler::builder().file_name("kaspad-heap.json").build();

let args = parse_args();
let fd_total_budget = fd_budget::limit() - args.rpc_max_clients as i32 - args.inbound_limit as i32 - args.outbound_target as i32;
let (core, _) = create_core(args, fd_total_budget);

let (core, _) = create_core(args, fd_budget::limit());

// Bind the keyboard signal to the core
Arc::new(Signals::new(&core)).init();
Expand Down
8 changes: 7 additions & 1 deletion protocol/flows/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use kaspa_core::{
trace,
};
use kaspa_p2p_lib::Adaptor;
use kaspa_utils::tcp_limiter::Limit;
use kaspa_utils::triggers::SingleTrigger;

use crate::flow_context::FlowContext;
Expand All @@ -23,6 +24,7 @@ pub struct P2pService {
dns_seeders: &'static [&'static str],
default_port: u16,
shutdown: SingleTrigger,
tcp_limit: Option<Arc<Limit>>,
}

impl P2pService {
Expand All @@ -35,6 +37,7 @@ impl P2pService {
inbound_limit: usize,
dns_seeders: &'static [&'static str],
default_port: u16,
tcp_limit: Option<Arc<Limit>>,
) -> Self {
Self {
flow_context,
Expand All @@ -46,6 +49,7 @@ impl P2pService {
inbound_limit,
dns_seeders,
default_port,
tcp_limit,
}
}
}
Expand All @@ -61,7 +65,9 @@ impl AsyncService for P2pService {
// Prepare a shutdown signal receiver
let shutdown_signal = self.shutdown.listener.clone();

let p2p_adaptor = Adaptor::bidirectional(self.listen, self.flow_context.hub().clone(), self.flow_context.clone()).unwrap();
let p2p_adaptor =
Adaptor::bidirectional(self.listen, self.flow_context.hub().clone(), self.flow_context.clone(), self.tcp_limit.clone())
.unwrap();
let connection_manager = ConnectionManager::new(
p2p_adaptor.clone(),
self.outbound_target,
Expand Down
2 changes: 1 addition & 1 deletion protocol/p2p/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn main() {
// [0] - init p2p-adaptor - server side
let ip_port = NetAddress::from_str("[::1]:50051").unwrap();
let initializer = Arc::new(EchoFlowInitializer::new());
let adaptor = kaspa_p2p_lib::Adaptor::bidirectional(ip_port, kaspa_p2p_lib::Hub::new(), initializer).unwrap();
let adaptor = kaspa_p2p_lib::Adaptor::bidirectional(ip_port, kaspa_p2p_lib::Hub::new(), initializer, None).unwrap();
// [1] - connect to a few peers
let ip_port = String::from("[::1]:16111");
for i in 0..1 {
Expand Down
6 changes: 5 additions & 1 deletion protocol/p2p/src/core/adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::core::hub::Hub;
use crate::ConnectionError;
use crate::{core::connection_handler::ConnectionHandler, Router};
use kaspa_utils::networking::NetAddress;
use kaspa_utils::tcp_limiter::Limit;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -58,17 +59,19 @@ impl Adaptor {
serve_address: NetAddress,
hub: Hub,
initializer: Arc<dyn ConnectionInitializer>,
tcp_limit: Option<Arc<Limit>>,
) -> Result<Arc<Self>, ConnectionError> {
let (hub_sender, hub_receiver) = mpsc_channel(Self::hub_channel_size());
let connection_handler = ConnectionHandler::new(hub_sender, initializer.clone());
let server_termination = connection_handler.serve(serve_address)?;
let server_termination = connection_handler.serve(serve_address, tcp_limit)?;
let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, hub));
adaptor.hub.clone().start_event_loop(hub_receiver, initializer);
Ok(adaptor)
}

/// Connect to a new peer (no retries)
pub async fn connect_peer(&self, peer_address: String) -> Result<PeerKey, ConnectionError> {
// todo pass tcp limit
self.connection_handler.connect_with_retry(peer_address, 1, Default::default()).await.map(|r| r.key())
}

Expand All @@ -79,6 +82,7 @@ impl Adaptor {
retry_attempts: u8,
retry_interval: Duration,
) -> Result<PeerKey, ConnectionError> {
// todo pass tcp limit
self.connection_handler.connect_with_retry(peer_address, retry_attempts, retry_interval).await.map(|r| r.key())
}

Expand Down
32 changes: 24 additions & 8 deletions protocol/p2p/src/core/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ use crate::{ConnectionInitializer, Router};
use futures::FutureExt;
use kaspa_core::{debug, info};
use kaspa_utils::networking::NetAddress;
use kaspa_utils::tcp_limiter::{Limit, Wrapper};
use std::net::ToSocketAddrs;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel as mpsc_channel, Sender as MpscSender};
use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tokio_stream::StreamExt;
use tonic::transport::{Error as TonicError, Server as TonicServer};
use tonic::{Request, Response, Status as TonicStatus, Streaming};

#[derive(Error, Debug)]
pub enum ConnectionError {
#[error("missing socket address")]
Expand Down Expand Up @@ -54,7 +55,11 @@ impl ConnectionHandler {
}

/// Launches a P2P server listener loop
pub(crate) fn serve(&self, serve_address: NetAddress) -> Result<OneshotSender<()>, ConnectionError> {
pub(crate) fn serve(
&self,
serve_address: NetAddress,
tcp_limit: Option<Arc<Limit>>,
) -> Result<OneshotSender<()>, ConnectionError> {
let (termination_sender, termination_receiver) = oneshot_channel::<()>();
let connection_handler = self.clone();
info!("P2P Server starting on: {}", serve_address);
Expand All @@ -64,11 +69,21 @@ impl ConnectionHandler {
.send_compressed(tonic::codec::CompressionEncoding::Gzip)
.max_decoding_message_size(P2P_MAX_MESSAGE_SIZE);

// TODO: check whether we should set tcp_keepalive
let serve_result = TonicServer::builder()
.add_service(proto_server)
.serve_with_shutdown(serve_address.into(), termination_receiver.map(drop))
.await;
let builder = TonicServer::builder().add_service(proto_server);
let serve_result = if let Some(limit) = tcp_limit {
let listener = TcpListener::bind(serve_address.to_string()).await.unwrap();
let tcp_stream = TcpListenerStream::new(listener).filter_map(|tcp_stream| match tcp_stream {
Ok(tcp_stream) => Wrapper::new(tcp_stream, limit.clone()).map(Ok),
Err(e) => Some(Err(e)),
});
builder
// TODO: check whether we should set tcp_keepalive
.serve_with_incoming_shutdown(tcp_stream, termination_receiver.map(drop))
.await
} else {
// TODO: check whether we should set tcp_keepalive
builder.serve_with_shutdown(serve_address.into(), termination_receiver.map(drop)).await
};

match serve_result {
Ok(_) => info!("P2P Server stopped: {}", serve_address),
Expand Down Expand Up @@ -128,6 +143,7 @@ impl ConnectionHandler {
retry_attempts: u8,
retry_interval: Duration,
) -> Result<Arc<Router>, ConnectionError> {
// todo consider tcp limit
let mut counter = 0;
loop {
counter += 1;
Expand Down
4 changes: 2 additions & 2 deletions protocol/p2p/src/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ mod tests {
kaspa_core::log::try_init_logger("debug");

let address1 = NetAddress::from_str("[::1]:50053").unwrap();
let adaptor1 = Adaptor::bidirectional(address1, Hub::new(), Arc::new(EchoFlowInitializer::new())).unwrap();
let adaptor1 = Adaptor::bidirectional(address1, Hub::new(), Arc::new(EchoFlowInitializer::new()), None).unwrap();

let address2 = NetAddress::from_str("[::1]:50054").unwrap();
let adaptor2 = Adaptor::bidirectional(address2, Hub::new(), Arc::new(EchoFlowInitializer::new())).unwrap();
let adaptor2 = Adaptor::bidirectional(address2, Hub::new(), Arc::new(EchoFlowInitializer::new()), None).unwrap();

// Initiate the connection from `adaptor1` (outbound) to `adaptor2` (inbound)
let peer2_id = adaptor1
Expand Down
4 changes: 3 additions & 1 deletion rpc/grpc/server/src/adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use kaspa_core::debug;
use kaspa_notify::notifier::Notifier;
use kaspa_rpc_core::{api::rpc::DynRpcService, notify::connection::ChannelConnection, Notification, RpcResult};
use kaspa_utils::networking::NetAddress;
use kaspa_utils::tcp_limiter::Limit;
use std::{ops::Deref, sync::Arc};
use tokio::sync::{mpsc::channel as mpsc_channel, oneshot::Sender as OneshotSender};

Expand Down Expand Up @@ -35,10 +36,11 @@ impl Adaptor {
manager: Manager,
core_service: DynRpcService,
core_notifier: Arc<Notifier<Notification, ChannelConnection>>,
tcp_limit: Option<Arc<Limit>>,
) -> Arc<Self> {
let (manager_sender, manager_receiver) = mpsc_channel(Self::manager_channel_size());
let connection_handler = ConnectionHandler::new(manager_sender, core_service.clone(), core_notifier);
let server_termination = connection_handler.serve(serve_address);
let server_termination = connection_handler.serve(serve_address, tcp_limit);
let adaptor = Arc::new(Adaptor::new(Some(server_termination), connection_handler, manager, serve_address));
adaptor.manager.clone().start_event_loop(manager_receiver);
adaptor.start();
Expand Down
Loading
Loading