Skip to content

Commit

Permalink
Allows test suite to run against a RabbitMQ container
Browse files Browse the repository at this point in the history
(cherry picked from commit 425f4d6)
  • Loading branch information
acogoluegnes committed Dec 8, 2020
1 parent 9e5349f commit afb1145
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 23 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,17 @@ can specify the path to `rabbitmqctl` like the following:

./gradlew check -Drabbitmqctl.bin=/path/to/rabbitmqctl

You need a local running RabbitMQ instance.
You need a local running RabbitMQ instance.

### Running tests with Docker

Start a RabbitMQ container:

docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8

Run the test suite:

./gradlew check -i -s -Drabbitmqctl.bin=DOCKER:rabbitmq

### Building IDE project
./gradlew eclipse
Expand Down
19 changes: 11 additions & 8 deletions src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,7 +86,7 @@ private static void wait(CountDownLatch latch) throws InterruptedException {
}

@BeforeEach
public void init() throws Exception {
public void init(TestInfo info) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
Expand All @@ -96,7 +97,9 @@ public void init() throws Exception {
channel.close();
receiver = null;
sender = null;
connectionMono = Mono.just(connectionFactory.newConnection()).cache();
connectionMono = Mono.just(connectionFactory.newConnection(
info.getTestMethod().get().getName()))
.cache();
}

@AfterEach
Expand Down Expand Up @@ -142,7 +145,7 @@ public void consumeConsumerShouldRecoverAutomatically(BiFunction<Receiver, Strin
}
});

closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
closeAndWaitForRecovery(connectionMono.block());

for (int $$ : IntStream.range(0, nbMessages).toArray()) {
channel.basicPublish("", queue, null, "Hello".getBytes());
Expand Down Expand Up @@ -295,7 +298,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
)))
.subscribe();

closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
closeAndWaitForRecovery(connectionMono.block());

assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(nbMessages, counter.get());
Expand Down Expand Up @@ -345,7 +348,7 @@ public void sendWithPublishConfirmsAllMessagesShouldBeSentConfirmedAndConsumed()
}
});

closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
closeAndWaitForRecovery(connectionMono.block());

// we expect all messages to make to the queue (they're retried)
assertTrue(consumedLatch.await(20, TimeUnit.SECONDS));
Expand Down Expand Up @@ -382,15 +385,15 @@ public void topologyRecovery() throws Exception {

latch.set(new CountDownLatch(1));

closeAndWaitForRecovery((RecoverableConnection) connectionMono.block());
closeAndWaitForRecovery(connectionMono.block());

ch.basicPublish(e, "", null, "".getBytes());
assertTrue(latch.get().await(5, TimeUnit.SECONDS));
}

private void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException {
private void closeAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
CountDownLatch latch = prepareForRecovery(connection);
Host.closeConnection((NetworkConnection) connection);
Host.closeConnection(connection);
wait(latch);
}

Expand Down
52 changes: 38 additions & 14 deletions src/test/java/reactor/rabbitmq/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package reactor.rabbitmq;

import com.rabbitmq.client.impl.NetworkConnection;
import com.rabbitmq.client.Connection;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -87,23 +87,34 @@ public static Process rabbitmqctl(String command) throws IOException {
}

public static String rabbitmqctlCommand() {
return System.getProperty("rabbitmqctl.bin");
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin", System.getenv("RABBITMQCTL_BIN"));
if (rabbitmqCtl == null) {
throw new IllegalStateException("Please define the rabbitmqctl.bin system property or "
+ "the RABBITMQCTL_BIN environment variable");
}
if (rabbitmqCtl.startsWith("DOCKER:")) {
String containerId = rabbitmqCtl.split(":")[1];
return "docker exec " + containerId + " rabbitmqctl";
} else {
return rabbitmqCtl;
}
}

public static void closeConnection(String pid) throws IOException {
private static void closeConnection(String pid) throws IOException {
rabbitmqctl("close_connection '" + pid + "' 'Closed via rabbitmqctl'");
}

public static void closeConnection(NetworkConnection c) throws IOException {
public static void closeConnection(Connection c) throws IOException {
Host.ConnectionInfo ci = findConnectionInfoFor(Host.listConnections(), c);
closeConnection(ci.getPid());
}

public static List<ConnectionInfo> listConnections() throws IOException {
String output = capture(rabbitmqctl("list_connections -q pid peer_port").getInputStream());
String output = capture(rabbitmqctl("list_connections -q pid peer_port client_properties")
.getInputStream());
// output (header line presence depends on broker version):
// pid peer_port
// <[email protected]> 58713
// <[email protected]> 58713 [{"product","RabbitMQ"},{"...
String[] allLines = output.split("\n");

ArrayList<ConnectionInfo> result = new ArrayList<ConnectionInfo>();
Expand All @@ -112,18 +123,31 @@ public static List<ConnectionInfo> listConnections() throws IOException {
String[] columns = line.split("\t");
// can be also header line, so ignoring NumberFormatException
try {
result.add(new ConnectionInfo(columns[0], Integer.valueOf(columns[1])));
Integer.valueOf(columns[1]); // just to ignore header line
result.add(new ConnectionInfo(columns[0], connectionName(columns[2])));
} catch (NumberFormatException e) {
// OK
}
}
return result;
}

private static Host.ConnectionInfo findConnectionInfoFor(List<ConnectionInfo> xs, NetworkConnection c) {
private static String connectionName(String clientProperties) {
String beginning = "\"connection_name\",\"";
int begin = clientProperties.indexOf(beginning);
if (begin > 0) {
int start = clientProperties.indexOf(beginning) + beginning.length();
int end = clientProperties.indexOf("\"", start);
return clientProperties.substring(start, end);
} else {
return null;
}
}

private static Host.ConnectionInfo findConnectionInfoFor(List<ConnectionInfo> xs, Connection c) {
Host.ConnectionInfo result = null;
for (Host.ConnectionInfo ci : xs) {
if (c.getLocalPort() == ci.getPeerPort()) {
if (c.getClientProvidedName().equals(ci.getName())) {
result = ci;
break;
}
Expand All @@ -134,19 +158,19 @@ private static Host.ConnectionInfo findConnectionInfoFor(List<ConnectionInfo> xs
public static class ConnectionInfo {

private final String pid;
private final int peerPort;
private final String name;

public ConnectionInfo(String pid, int peerPort) {
public ConnectionInfo(String pid, String name) {
this.pid = pid;
this.peerPort = peerPort;
this.name = name;
}

public String getPid() {
return pid;
}

public int getPeerPort() {
return peerPort;
public String getName() {
return name;
}
}
}

0 comments on commit afb1145

Please sign in to comment.