Skip to content

Commit

Permalink
Config ErrorHandlingDeserializer to DLQ unparsable messages (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
smozely authored and wtcarter-gw committed Apr 10, 2024
1 parent 66efc92 commit 0ae108f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 10 deletions.
19 changes: 18 additions & 1 deletion packages/Manager/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ spring:

consumer:
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.value.type.method: nz.govt.eop.messages.KafkaMessageTypes.determineTypeFromTopicName
spring.json.trusted.packages: nz.govt.eop.*
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer


# These producer settings are used when publishing messages to the DLQ
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# In the situation that a message was received as invalid JSON and could not be deserialized, it ends up being a ByteArray message that gets published.
# the JSON serializer will still serialize the ByteArray as a string, but it will be a string of bytes, not a string of characters.
# See WaterAllocationConsumerErrorHandlerTest
# Can't figure out a good way to avoid this without writing a custom serializer.
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false

streams:
properties:
Expand All @@ -31,6 +46,8 @@ spring:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
application-id: nz.govt.eop.consumers.manager


management:
endpoints:
enabled-by-default: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.utils.KafkaTestUtils
import org.springframework.stereotype.Component
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ActiveProfiles

@ActiveProfiles("test", "fake-consumer")
Expand All @@ -36,6 +37,7 @@ class WaterAllocationConsumerErrorHandlerTest(
@Autowired val fakeConsumer: FakeConsumer
) {

@DirtiesContext
@Test
fun `Should send message to DLT when processing fails multiple times`() {
// GIVEN
Expand All @@ -46,7 +48,7 @@ class WaterAllocationConsumerErrorHandlerTest(

val firstMessage =
WaterAllocationMessage(
"sourceId",
"poison",
"consentId",
ConsentStatus.valueOf("active"),
"area-id",
Expand All @@ -58,7 +60,7 @@ class WaterAllocationConsumerErrorHandlerTest(
"firstIngestId",
Instant.now())

val secondMessage = firstMessage.copy(sourceId = "poison")
val secondMessage = firstMessage.copy(sourceId = "sourceId")

// WHEN
template.send(WATER_ALLOCATION_TOPIC_NAME, "poison", firstMessage)
Expand All @@ -81,6 +83,31 @@ class WaterAllocationConsumerErrorHandlerTest(
fakeConsumer.messageProcessed.shouldBe(true)
}
}

@DirtiesContext
@Test
fun `Should handle an unparsable message by sending it to the DLT`() {
// GIVEN
val consumerProps = KafkaTestUtils.consumerProps("test", "true", broker)
val cf = DefaultKafkaConsumerFactory(consumerProps, StringDeserializer(), StringDeserializer())
val dltConsumer: Consumer<String, String> = cf.createConsumer()
dltConsumer.subscribe(listOf("$WATER_ALLOCATION_TOPIC_NAME.manager-consumer.DLT"))

val invalidMessage = """{"foo":"bar"}"""

// WHEN
template.send(WATER_ALLOCATION_TOPIC_NAME, "key1", invalidMessage)

// THEN
// Assert the first message ended up in the DLT
val dltRecord =
KafkaTestUtils.getSingleRecord(
dltConsumer, "$WATER_ALLOCATION_TOPIC_NAME.manager-consumer.DLT")
dltRecord.shouldNotBeNull()

// This assert json shows how the message is a byte array encoded as JSON
dltRecord.value().shouldBe("\"IntcImZvb1wiOlwiYmFyXCJ9Ig==\"")
}
}

@Profile("fake-consumer")
Expand Down
7 changes: 0 additions & 7 deletions packages/Manager/src/test/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ spring:
user: postgres
password: password

kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false

logging:
level:
root: WARN
Expand Down

0 comments on commit 0ae108f

Please sign in to comment.