Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle batch failures in FIFO queues correctly #1183

Merged
merged 24 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a5a0a82
Sketch out batch processing failures for FIFO queues
scottgerring Jun 8, 2023
d0699df
Write some tests
scottgerring Jun 8, 2023
44c7128
Fix bug
scottgerring Jun 8, 2023
4c061e4
Report skipped messages as failures
scottgerring Jun 8, 2023
4d5bd9a
Clean up and refactor a bit
scottgerring Jun 15, 2023
bac2581
Better testing
scottgerring Jun 15, 2023
8d42118
Update doco
scottgerring Jun 15, 2023
be8d9f9
Merge branch 'master' into fix-fifo-batch-failure
scottgerring Jun 15, 2023
b350dbd
Merge branch 'main' into fix-fifo-batch-failure
scottgerring Jun 21, 2023
9f44405
Merge branch 'main' into fix-fifo-batch-failure
scottgerring Jun 21, 2023
96d4799
Merge branch 'main' into fix-fifo-batch-failure
scottgerring Jun 22, 2023
cdce3e2
Merge branch 'main' into fix-fifo-batch-failure
scottgerring Jul 5, 2023
7105b0e
Address some review comments
scottgerring Jul 5, 2023
418152c
Merge branch 'main' into fix-fifo-batch-failure
scottgerring Jul 10, 2023
617ec8a
Use event serialization helpers
scottgerring Jul 10, 2023
0f78133
What about queues?
scottgerring Jul 10, 2023
c0cd20b
Rejig
scottgerring Jul 10, 2023
8480a41
Update powertools-sqs/src/main/java/software/amazon/lambda/powertools…
scottgerring Jul 11, 2023
a99b842
Address review comments
scottgerring Jul 11, 2023
fa497ed
Merge remote-tracking branch 'refs/remotes/origin/fix-fifo-batch-fail…
scottgerring Jul 11, 2023
6851776
Increase coverage
scottgerring Jul 11, 2023
2a39a84
Remove break
scottgerring Jul 11, 2023
405b170
Merge branch 'main' into fix-fifo-batch-failure
jeromevdl Jul 11, 2023
19d8096
Merge branch 'main' into fix-fifo-batch-failure
jeromevdl Jul 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ description: Utility
---

The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
The utility handles batch processing for both
[standard](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html) and
[FIFO](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) SQS queues.

**Key Features**

Expand Down Expand Up @@ -110,8 +113,11 @@ Both have nearly the same behaviour when it comes to processing messages from th
* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will:
- **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
- **2)** if, non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`.
- **3)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
- **2)** If a message with a [message group ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html) fails,
the processing of the batch will be stopped and the remainder of the messages will be returned to SQS.
This behaviour [is required to handle SQS FIFO queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting).
- **3)** if non retryable exceptions occur, messages resulting in configured exceptions during processing will be immediately moved to the dead letter queue associated to the source SQS queue or deleted from the source SQS queue if `deleteNonRetryableMessageFromQueue` is set to `true`.
- **4)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue

The only difference is that **SqsUtils Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the
utility will have access to both successful and failed messaged along with failure exceptions.
Expand Down
4 changes: 4 additions & 0 deletions powertools-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-tests</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -26,6 +28,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.lambda.powertools.sqs.exception.SkippedMessageDueToFailedBatchException;
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
import software.amazon.payloadoffloading.PayloadS3Pointer;
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
Expand All @@ -43,6 +46,9 @@ public final class SqsUtils {
private static SqsClient client;
private static S3Client s3Client;

// The attribute on an SQS-FIFO message used to record the message group ID
private static final String MESSAGE_GROUP_ID = "MessageGroupId";

private SqsUtils() {
}

Expand Down Expand Up @@ -490,19 +496,51 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
}

BatchContext batchContext = new BatchContext(client);

for (SQSMessage message : event.getRecords()) {
int offset = 0;
while (offset < event.getRecords().size()) {
// Get the current message and advance to the next. Doing this here
// makes it easier for us to know where we are up to if we have to
// break out of here early.
SQSMessage message = event.getRecords().get(offset);
offset++;

// If the batch hasn't failed, try process the message
try {
handlerReturn.add(handler.process(message));
batchContext.addSuccess(message);
} catch (Exception e) {

// Record the failure
batchContext.addFailure(message, e);

// If we are trying to process a message that has a messageGroupId, we are on a FIFO queue. A failure
// now stops us from processing the rest of the batch; we break out of the loop leaving unprocessed
// messages in the queue
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
String messageGroupId = message.getAttributes() != null ?
message.getAttributes().get(MESSAGE_GROUP_ID) : null;
if (messageGroupId != null) {
LOG.info("A message in a message batch with messageGroupId {} and messageId {} failed; failing the rest of the batch too"
, messageGroupId, message.getMessageId());
break;
jeromevdl marked this conversation as resolved.
Show resolved Hide resolved
}
LOG.error("Encountered issue processing message: {}", message.getMessageId(), e);
}
}

batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
// If we have a FIFO batch failure, unprocessed messages will remain on the queue
// past the failed message. We have to add these to the errors
if (offset < event.getRecords().size()) {
event.getRecords()
.subList(offset, event.getRecords().size())
.forEach(message -> {
LOG.info("Skipping message {} as another message with a message group failed in this batch",
message.getMessageId());
batchContext.addFailure(message, new SkippedMessageDueToFailedBatchException());
});
}

batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
return handlerReturn;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package software.amazon.lambda.powertools.sqs.exception;

/**
* Indicates that a message has been skipped due to the batch it is
* within failing.
*/
public class SkippedMessageDueToFailedBatchException extends Exception {

public SkippedMessageDueToFailedBatchException() {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package software.amazon.lambda.powertools.sqs;

import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.tests.EventLoader;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.*;
import org.mockito.quality.Strictness;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.MockitoAnnotations.openMocks;
import static software.amazon.lambda.powertools.sqs.SqsUtils.batchProcessor;
import static software.amazon.lambda.powertools.sqs.SqsUtils.overrideSqsClient;

public class SqsUtilsFifoBatchProcessorTest {

private static SQSEvent sqsBatchEvent;
private MockitoSession session;

@Mock
private SqsClient sqsClient;

@Captor
private ArgumentCaptor<DeleteMessageBatchRequest> deleteMessageBatchCaptor;

public SqsUtilsFifoBatchProcessorTest() throws IOException {
sqsBatchEvent = EventLoader.loadSQSEvent("SqsFifoBatchEvent.json");
}

@BeforeEach
public void setup() {
// Establish a strict mocking session. This means that any
// calls to a mock that have not been stubbed will throw
this.session = Mockito.mockitoSession()
.strictness(Strictness.STRICT_STUBS)
.initMocks(this)
.startMocking();

overrideSqsClient(sqsClient);
}

@AfterEach
public void tearDown() {
session.finishMocking();
}

@Test
public void processWholeBatch() {
// Act
AtomicInteger processedCount = new AtomicInteger();
List<Object> results = batchProcessor(sqsBatchEvent, false, (message) -> {
processedCount.getAndIncrement();
return true;
});

// Assert
assertThat(processedCount.get()).isEqualTo(3);
assertThat(results.size()).isEqualTo(3);
}

/**
* Check that a failure in the middle of the batch:
* - deletes the successful message explicitly from SQS
* - marks the failed and subsequent message as failed
* - does not delete the failed or subsequent message
*/
@Test
public void singleFailureInMiddleOfBatch() {
// Arrange
Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse
.builder().build());

// Act
AtomicInteger processedCount = new AtomicInteger();
assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> {
int value = processedCount.getAndIncrement();
if (value == 1) {
throw new RuntimeException("Whoops");
}
return true;
}))

// Assert
.isInstanceOf(SQSBatchProcessingException.class)
.satisfies(e -> {
List<SQSEvent.SQSMessage> failures = ((SQSBatchProcessingException)e).getFailures();
assertThat(failures.size()).isEqualTo(2);
List<String> failureIds = failures.stream()
.map(SQSEvent.SQSMessage::getMessageId)
.collect(Collectors.toList());
assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(1).getMessageId());
assertThat(failureIds).contains(sqsBatchEvent.getRecords().get(2).getMessageId());
});

DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue();
List<String> messageIds = deleteRequest.entries().stream()
.map(DeleteMessageBatchRequestEntry::id)
.collect(Collectors.toList());
assertThat(deleteRequest.entries().size()).isEqualTo(1);
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue();

}

@Test
public void singleFailureAtEndOfBatch() {

// Arrange
Mockito.when(sqsClient.deleteMessageBatch(deleteMessageBatchCaptor.capture())).thenReturn(DeleteMessageBatchResponse
.builder().build());


// Act
AtomicInteger processedCount = new AtomicInteger();
assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> batchProcessor(sqsBatchEvent, false, (message) -> {
int value = processedCount.getAndIncrement();
if (value == 2) {
throw new RuntimeException("Whoops");
}
return true;
}));

// Assert
DeleteMessageBatchRequest deleteRequest = deleteMessageBatchCaptor.getValue();
List<String> messageIds = deleteRequest.entries().stream()
.map(DeleteMessageBatchRequestEntry::id)
.collect(Collectors.toList());
assertThat(deleteRequest.entries().size()).isEqualTo(2);
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(0).getMessageId())).isTrue();
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(1).getMessageId())).isTrue();
assertThat(messageIds.contains(sqsBatchEvent.getRecords().get(2).getMessageId())).isFalse();
}

@Test
public void messageFailureStopsGroupProcessing() {
String groupToFail = sqsBatchEvent.getRecords().get(0).getAttributes().get("MessageGroupId");

assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> batchProcessor(sqsBatchEvent, (message) -> {
String groupId = message.getAttributes().get("MessageGroupId");
if (groupId.equals(groupToFail)) {
throw new RuntimeException("Failed processing");
}
return groupId;
}))
.satisfies(e -> {
assertThat(e.successMessageReturnValues().size()).isEqualTo(0);
assertThat(e.successMessageReturnValues().contains(groupToFail)).isFalse();
});
}

}
73 changes: 73 additions & 0 deletions powertools-sqs/src/test/resources/SqsFifoBatchEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
"MessageGroupId": "Group1"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649",
"MessageGroupId": "Group2"
},
"messageAttributes": {
"Attribute3" : {
"binaryValue" : "MTEwMA==",
"dataType" : "Binary"
},
"Attribute2" : {
"stringValue" : "123",
"dataType" : "Number"
},
"Attribute1" : {
"stringValue" : "AttributeValue1",
"dataType" : "String"
}
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-9696-9c92662ba5da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649",
"MessageGroupId": "Group1"
},
"messageAttributes": {
"Attribute2" : {
"stringValue" : "123",
"dataType" : "Number"
}
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
"awsRegion": "us-east-2"
}
]
}