Skip to content

Commit

Permalink
Use kafka transactions to make spring-kafka tests more stable (#4024)
Browse files Browse the repository at this point in the history
* Use kafka transactions to make spring-kafka tests more stable

* manual acks

* another approach: batch error handler that immediately recovers

* another try

* yet another try

* do nothing error handler

* spotless
  • Loading branch information
Mateusz Rzeszutek authored Aug 30, 2021
1 parent 0f9308b commit 01ea967
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import org.apache.kafka.clients.consumer.ConsumerRecords
import org.springframework.kafka.listener.BatchErrorHandler

class DoNothingBatchErrorHandler implements BatchErrorHandler {
@Override
void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {

def app = new SpringApplication(ConsumerConfig)
app.setDefaultProperties([
"spring.jmx.enabled" : false,
"spring.main.web-application-type" : "none",
"spring.kafka.bootstrap-servers" : kafka.bootstrapServers,
"spring.kafka.consumer.auto-offset-reset": "earliest",
"spring.kafka.consumer.linger-ms" : 10,
"spring.jmx.enabled" : false,
"spring.main.web-application-type" : "none",
"spring.kafka.bootstrap-servers" : kafka.bootstrapServers,
"spring.kafka.consumer.auto-offset-reset" : "earliest",
"spring.kafka.consumer.linger-ms" : 10,
// wait 1s between poll() calls
"spring.kafka.listener.idle-between-polls" : 1000,
"spring.kafka.producer.transaction-id-prefix": "test-",
])
applicationContext = app.run()
}
Expand All @@ -58,9 +61,11 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {

when:
runWithSpan("producer") {
kafkaTemplate.send("testTopic", "10", "testSpan1")
kafkaTemplate.send("testTopic", "20", "testSpan2")
kafkaTemplate.flush()
// wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testTopic", "10", "testSpan1")
ops.send("testTopic", "20", "testSpan2")
})
}

then:
Expand Down Expand Up @@ -152,8 +157,9 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {

when:
runWithSpan("producer") {
kafkaTemplate.send("testTopic", "10", "error")
kafkaTemplate.flush()
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testTopic", "10", "error")
})
}

then:
Expand Down Expand Up @@ -240,6 +246,8 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>()
// do not retry failed records
factory.setBatchErrorHandler(new DoNothingBatchErrorHandler())
factory.setConsumerFactory(consumerFactory)
factory.setBatchListener(true)
factory.setAutoStartup(true)
Expand Down

0 comments on commit 01ea967

Please sign in to comment.