From 979c61e125d76a18c2a0353b2db89325af303ce3 Mon Sep 17 00:00:00 2001 From: Pavel Kotelevsky <38818382+chillleader@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:59:09 +0200 Subject: [PATCH] fix(sqs): fail fast when queue does not exist (#3415) * fix(sqs): fail fast when queue does not exist * fix tests (cherry picked from commit 949a7429896189911cd4499f2b8e468e5a26d9ca) --- .../connector/inbound/SqsExecutable.java | 14 ++++++++++ .../connector/inbound/SqsQueueConsumer.java | 5 ++++ .../connector/inbound/SqsExecutableTest.java | 26 +++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java index 2fdca85721..322b2a358e 100644 --- a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java +++ b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsExecutable.java @@ -7,6 +7,8 @@ package io.camunda.connector.inbound; import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.QueueAttributeName; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import io.camunda.connector.api.annotation.InboundConnector; import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; @@ -15,6 +17,7 @@ import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier; import io.camunda.connector.common.suppliers.DefaultAmazonSQSClientSupplier; import io.camunda.connector.inbound.model.SqsInboundProperties; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -23,6 +26,7 @@ @InboundConnector(name = "AWS SQS Inbound", type = "io.camunda:aws-sqs-inbound:1") public class SqsExecutable implements InboundConnectorExecutable { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsExecutable.class); private final AmazonSQSClientSupplier sqsClientSupplier; private final ExecutorService executorService; @@ -54,6 +58,16 @@ public void activate(final InboundConnectorContext context) { amazonSQS = sqsClientSupplier.sqsClient( CredentialsProviderSupport.credentialsProvider(properties), region); + + try { + amazonSQS.getQueueAttributes( + properties.getQueue().url(), + List.of(QueueAttributeName.ApproximateNumberOfMessages.toString())); + } catch (QueueDoesNotExistException e) { + LOGGER.error("Queue does not exist, failing subscription activation"); + throw new RuntimeException("Queue does not exist: " + properties.getQueue().url()); + } + LOGGER.debug("SQS client created successfully"); if (sqsQueueConsumer == null) { sqsQueueConsumer = new SqsQueueConsumer(amazonSQS, properties, context); diff --git a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java index 20b0157976..cb48e97d69 100644 --- a/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java +++ b/connectors/aws/aws-sqs/src/main/java/io/camunda/connector/inbound/SqsQueueConsumer.java @@ -47,6 +47,11 @@ public void run() { do { try { receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest); + } catch (Exception e) { + LOGGER.error("Failed to receive messages from SQS queue", e); + continue; + } + try { List messages = receiveMessageResult.getMessages(); for (Message message : messages) { try { diff --git a/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java b/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java index 455b3beb46..d11af001df 100644 --- a/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java +++ b/connectors/aws/aws-sqs/src/test/java/io/camunda/connector/inbound/SqsExecutableTest.java @@ -9,6 +9,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.file.Files.readString; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -19,6 +20,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import com.fasterxml.jackson.core.type.TypeReference; @@ -117,6 +119,30 @@ public void deactivateTest() { assertThat(executorService.isShutdown()).isTrue(); } + @Test + public void nonExistingQueueTest() { + // Given + when(sqsClient.getQueueAttributes(any(), any())).thenThrow(new QueueDoesNotExistException("")); + when(supplier.sqsClient(any(AWSCredentialsProvider.class), eq(ACTUAL_QUEUE_REGION))) + .thenReturn(sqsClient); + Map properties = + Map.of( + "authentication", + Map.of( + "secretKey", ACTUAL_SECRET_KEY, + "accessKey", ACTUAL_ACCESS_KEY), + "configuration", + Map.of("region", "us-east-1"), + "queue", + Map.of("url", ACTUAL_QUEUE_URL, "pollingWaitTime", "1")); + var context = createConnectorContext(properties, createDefinition()); + consumer = new SqsQueueConsumer(sqsClient, new SqsInboundProperties(), context); + consumer.setQueueConsumerActive(true); + SqsExecutable sqsExecutable = new SqsExecutable(supplier, executorService, consumer); + // When & then + assertThrows(RuntimeException.class, () -> sqsExecutable.activate(context)); + } + private InboundConnectorDefinition createDefinition() { return InboundConnectorDefinitionBuilder.create() .bpmnProcessId("proc-id")