Skip to content

Commit

Permalink
benchmarks: replace kafka-unit with docker
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
Adrian Cole committed Dec 14, 2023
1 parent 077737c commit fa83eff
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 31 deletions.
8 changes: 3 additions & 5 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@
</dependency>

<dependency>
<groupId>com.github.charithe</groupId>
<artifactId>kafka-junit</artifactId>
<!-- This version is tightly coupled to the version of kafka-clients being used.
https:/charithe/kafka-junit -->
<version>4.2.10</version>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
</dependency>

<!-- Main code uses jul and tests log with log4j -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 The OpenZipkin Authors
* Copyright 2016-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,27 +13,59 @@
*/
package zipkin2.reporter;

import com.github.charithe.kafka.EphemeralKafkaBroker;
import com.github.charithe.kafka.KafkaJunitRule;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.InternetProtocol;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import zipkin2.reporter.kafka.KafkaSender;

import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.testcontainers.utility.DockerImageName.parse;

public class KafkaSenderBenchmarks extends SenderBenchmarks {
EphemeralKafkaBroker broker = EphemeralKafkaBroker.create();
KafkaJunitRule kafka;
static final Logger LOGGER = LoggerFactory.getLogger(KafkaContainer.class);
static final int KAFKA_PORT = 19092;

static final class KafkaContainer extends GenericContainer<KafkaContainer> {
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:2.25.1"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
withLogConsumer(new Slf4jLogConsumer(LOGGER));
}

String bootstrapServer() {
return getHost() + ":" + getMappedPort(KAFKA_PORT);
}
}

KafkaContainer kafka;
KafkaConsumer<byte[], byte[]> consumer;

@Override protected Sender createSender() throws Exception {
broker.start();
kafka = new KafkaJunitRule(broker).waitForStartup();
consumer = kafka.helper().createByteConsumer();
@Override protected Sender createSender() {
kafka = new KafkaContainer();
kafka.start();

Properties config = new Properties();
config.put(BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServer());
config.put(GROUP_ID_CONFIG, "zipkin");

consumer =
new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
consumer.subscribe(Collections.singletonList("zipkin"));

new Thread(() -> {
Expand All @@ -45,11 +77,11 @@ public class KafkaSenderBenchmarks extends SenderBenchmarks {
}
}).start();

return KafkaSender.create(broker.getBrokerList().get());
return KafkaSender.create(kafka.bootstrapServer());
}

@Override protected void afterSenderClose() throws Exception {
broker.stop();
@Override protected void afterSenderClose() {
kafka.stop();
}

// Convenience main entry-point
Expand Down
16 changes: 2 additions & 14 deletions benchmarks/src/main/java/zipkin2/reporter/SenderBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
*/
package zipkin2.reporter;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import okio.Okio;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -109,12 +106,12 @@ public void setup() throws Throwable {
protected abstract Sender createSender() throws Exception;

@Setup(Level.Iteration)
public void fillQueue() throws IOException {
public void fillQueue() {
while (reporter.pending.offer(clientSpan, clientSpanBytes.length));
}

@TearDown(Level.Iteration)
public void clearQueue() throws IOException {
public void clearQueue() {
reporter.pending.clear();
}

Expand All @@ -136,14 +133,5 @@ public void close() throws Exception {
}

protected abstract void afterSenderClose() throws Exception;

static byte[] spanFromResource(String jsonResource) {
InputStream stream = SenderBenchmarks.class.getResourceAsStream(jsonResource);
try {
return Okio.buffer(Okio.source(stream)).readByteArray();
} catch (IOException e) {
throw new AssertionError(e);
}
}
}

0 comments on commit fa83eff

Please sign in to comment.