Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support seed brokers' hostname with multiple addresses #877

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changes and additions to the library will be listed here.

- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).
- Add support for `murmur2` based partitioning.
- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877).

## 1.3.0

Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ require "kafka"
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")
```

You can also use a hostname with seed brokers' IP addresses:

```ruby
kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true)
```

### Producing Messages to Kafka

The simplest way to write a message to a Kafka topic is to call `#deliver_message`:
Expand Down
9 changes: 8 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,22 @@ class Client
# the SSL certificate and the signing chain of the certificate have the correct domains
# based on the CA certificate
#
# @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers.
# If a broker is resolved to multiple IP addresses, the client tries to connect to each
# of the addresses until it can connect.
#
# @return [Client]
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil,
sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil,
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true)
sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true,
resolve_seed_brokers: false)
@logger = TaggedLogger.new(logger)
@instrumenter = Instrumenter.new(client_id: client_id)
@seed_brokers = normalize_seed_brokers(seed_brokers)
@resolve_seed_brokers = resolve_seed_brokers

ssl_context = SslContext.build(
ca_cert_file_path: ssl_ca_cert_file_path,
Expand Down Expand Up @@ -809,6 +815,7 @@ def initialize_cluster
seed_brokers: @seed_brokers,
broker_pool: broker_pool,
logger: @logger,
resolve_seed_brokers: @resolve_seed_brokers,
)
end

Expand Down
50 changes: 28 additions & 22 deletions lib/kafka/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "kafka/broker_pool"
require "resolv"
require "set"

module Kafka
Expand All @@ -18,14 +19,16 @@ class Cluster
# @param seed_brokers [Array<URI>]
# @param broker_pool [Kafka::BrokerPool]
# @param logger [Logger]
def initialize(seed_brokers:, broker_pool:, logger:)
# @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize}
def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false)
if seed_brokers.empty?
raise ArgumentError, "At least one seed broker must be configured"
end

@logger = TaggedLogger.new(logger)
@seed_brokers = seed_brokers
@broker_pool = broker_pool
@resolve_seed_brokers = resolve_seed_brokers
@cluster_info = nil
@stale = true

Expand Down Expand Up @@ -418,32 +421,35 @@ def get_leader_id(topic, partition)
# @return [Protocol::MetadataResponse] the cluster metadata.
def fetch_cluster_info
errors = []

@seed_brokers.shuffle.each do |node|
@logger.info "Fetching cluster metadata from #{node}"

begin
broker = @broker_pool.connect(node.hostname, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)

if cluster_info.brokers.empty?
@logger.error "No brokers in cluster"
else
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

@stale = false

return cluster_info
(@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip|
node_info = node.to_s
node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip
@logger.info "Fetching cluster metadata from #{node_info}"
Copy link

@sodabrew sodabrew Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@logger.info "Fetching cluster metadata from #{node_info}"
@logger.info "Fetching cluster metadata from #{node.hostname}#{node_info}"

This would show both the hostname and the resolved IP address that's being attempted.

Copy link
Contributor Author

@abicky abicky Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node.hostname is not necessary because node.to_s has the information:

node_info = node.to_s # e.g. "kafka://localhost:9092"
# e.g. "kafka://localhost:9092" => "kafka://localhost:9092 (127.0.0.1)"
node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip

Here is an example output:

/Users/arabiki/ghq/src/github.com/zendesk/ruby-kafka/lib/kafka/cluster.rb:454:in `fetch_cluster_info': Could not connect to any of the seed brokers: (Kafka::ConnectionError)
- kafka://localhost0:9092 (10.0.0.1): Operation timed out

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right on, that looks good already!


begin
broker = @broker_pool.connect(hostname_or_ip, node.port)
cluster_info = broker.fetch_metadata(topics: @target_topics)

if cluster_info.brokers.empty?
@logger.error "No brokers in cluster"
else
@logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

@stale = false

return cluster_info
end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node_info}: #{e}"
errors << [node_info, e]
ensure
broker.disconnect unless broker.nil?
end
rescue Error => e
@logger.error "Failed to fetch metadata from #{node}: #{e}"
errors << [node, e]
ensure
broker.disconnect unless broker.nil?
end
end

error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")
error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n")

raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
end
Expand Down
43 changes: 43 additions & 0 deletions spec/cluster_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,47 @@
}.to raise_exception(ArgumentError)
end
end

describe "#cluster_info" do
let(:cluster) {
Kafka::Cluster.new(
seed_brokers: [URI("kafka://test1:9092")],
broker_pool: broker_pool,
logger: LOGGER,
resolve_seed_brokers: resolve_seed_brokers,
)
}

before do
allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" }
allow(broker).to receive(:disconnect)
end

context "when resolve_seed_brokers is false" do
let(:resolve_seed_brokers) { false }

it "tries the seed broker hostnames as is" do
expect(broker_pool).to receive(:connect).with("test1", 9092) { broker }
expect {
cluster.cluster_info
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out})
end
end

context "when resolve_seed_brokers is true" do
let(:resolve_seed_brokers) { true }

before do
allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] }
end

it "tries all the resolved IP addresses" do
expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker }
expect(broker_pool).to receive(:connect).with("::1", 9092) { broker }
expect {
cluster.cluster_info
}.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out})
end
end
end
end