Skip to content

Commit

Permalink
Support seed brokers' hostname with multiple addresses
Browse files Browse the repository at this point in the history
By default, the Java implementation of a Kafka client can use
each returned IP address from a hostname.
cf. https://kafka.apache.org/documentation/#client.dns.lookup

librdkafka also does:
> If host resolves to multiple addresses librdkafka will
> round-robin the addresses for each connection attempt.

cf. https:/edenhill/librdkafka/blob/v1.5.3/INTRODUCTION.md#brokers

Such a feature helps us to manage seed brokers' IP addresses
easily, so this commit implements it.
  • Loading branch information
abicky committed Apr 14, 2021
1 parent a56d16b commit 99fcabb
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
1 change: 0 additions & 1 deletion .ruby-version

This file was deleted.

6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ require "kafka"
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")
```

You can also use a hostname with seed brokers' IP addresses:

```ruby
kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true)
```

### Producing Messages to Kafka

The simplest way to write a message to a Kafka topic is to call `#deliver_message`:
Expand Down
9 changes: 8 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,22 @@ class Client
# the SSL certificate and the signing chain of the certificate have the correct domains
# based on the CA certificate
#
# @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers.
# If a broker is resolved to multiple IP addresses, the client tries to connect to each
# of the addresses until it can connect.
#
# @return [Client]
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true,
resolve_seed_brokers: false)
@logger = TaggedLogger.new(logger)
@instrumenter = Instrumenter.new(client_id: client_id)
@seed_brokers = normalize_seed_brokers(seed_brokers)
@resolve_seed_brokers = resolve_seed_brokers

ssl_context = SslContext.build(
ca_cert_file_path: ssl_ca_cert_file_path,
Expand Down Expand Up @@ -809,6 +815,7 @@ def initialize_cluster
seed_brokers: @seed_brokers,
broker_pool: broker_pool,
logger: @logger,
resolve_seed_brokers: @resolve_seed_brokers,
)
end

Expand Down
50 changes: 28 additions & 22 deletions lib/kafka/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "kafka/broker_pool"
require "resolv"
require "set"

module Kafka
Expand All @@ -18,14 +19,16 @@ class Cluster
# @param seed_brokers [Array<URI>]
# @param broker_pool [Kafka::BrokerPool]
# @param logger [Logger]
def initialize(seed_brokers:, broker_pool:, logger:)
# @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize}
def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false)
if seed_brokers.empty?
raise ArgumentError, "At least one seed broker must be configured"
end

@logger = TaggedLogger.new(logger)
@seed_brokers = seed_brokers
@broker_pool = broker_pool
@resolve_seed_brokers = resolve_seed_brokers
@cluster_info = nil
@stale = true

Expand Down Expand Up @@ -418,32 +421,35 @@ def get_leader_id(topic, partition)
# @return [Protocol::MetadataResponse] the cluster metadata.
def fetch_cluster_info
errors = []

@seed_brokers.shuffle.each do |node|
@logger.info "Fetching cluster metadata from #{node}"

begin
broker = @broker_pool.connect(node.hostname, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)

if cluster_info.brokers.empty?
@logger.error "No brokers in cluster"
else
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

@stale = false

return cluster_info
(@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip|
node_info = node.to_s
node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip
@logger.info "Fetching cluster metadata from #{node_info}"

begin
broker = @broker_pool.connect(hostname_or_ip, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)

if cluster_info.brokers.empty?
@logger.error "No brokers in cluster"
else
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

@stale = false

return cluster_info
end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node_info}: #{e}"
errors << [node_info, e]
ensure
broker.disconnect unless broker.nil?
end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node}: #{e}"
errors << [node, e]
ensure
broker.disconnect unless broker.nil?
end
end

error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")
error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n")

raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
end
Expand Down
43 changes: 43 additions & 0 deletions spec/cluster_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,47 @@
}.to raise_exception(ArgumentError)
end
end

describe "#cluster_info" do
let(:cluster) {
Kafka::Cluster.new(
seed_brokers: [URI("kafka://test1:9092")],
broker_pool: broker_pool,
logger: LOGGER,
resolve_seed_brokers: resolve_seed_brokers,
)
}

before do
allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" }
allow(broker).to receive(:disconnect)
end

context "when resolve_seed_brokers is false" do
let(:resolve_seed_brokers) { false }

it "tries the seed broker hostnames as is" do
expect(broker_pool).to receive(:connect).with("test1", 9092) { broker }
expect {
cluster.cluster_info
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out})
end
end

context "when resolve_seed_brokers is true" do
let(:resolve_seed_brokers) { true }

before do
allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] }
end

it "tries all the resolved IP addresses" do
expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker }
expect(broker_pool).to receive(:connect).with("::1", 9092) { broker }
expect {
cluster.cluster_info
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out})
end
end
end
end

0 comments on commit 99fcabb

Please sign in to comment.