Skip to content

Commit

Permalink
Use CyclicBarrier instead of CountdownLatch
Browse files Browse the repository at this point in the history
  • Loading branch information
williamrandolph committed Mar 29, 2024
1 parent 98b2036 commit 574bfef
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
}

public void testKibanaThreadPool() {
assertRunsWithThreadPoolsBlocked(() -> {
runWithBlockedThreadPools(() -> {
// index documents
String idToDelete = client().prepareIndex(".kibana").setSource(Map.of("foo", "delete me!")).get().getId();
String idToUpdate = client().prepareIndex(".kibana").setSource(Map.of("foo", "update me!")).get().getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@

package org.elasticsearch.indices;

import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

import static org.hamcrest.Matchers.startsWith;

Expand Down Expand Up @@ -45,49 +45,54 @@ protected Set<String> threadPoolsToBlock() {
protected Settings nodeSettings() {
var settingsBuilder = Settings.builder().put(super.nodeSettings()).put("node.name", "kibana-thread-pool-tests");
for (String threadPoolName : threadPoolsToBlock()) {
settingsBuilder.put("thread_pool." + threadPoolName + ".size", 1);
settingsBuilder.put("thread_pool." + threadPoolName + ".queue_size", 0);
settingsBuilder.put("thread_pool." + threadPoolName + ".queue_size", 10);
}
return settingsBuilder.build();
}

private CountDownLatch blockThreads() {
CountDownLatch latch = new CountDownLatch(1);
for (String threadPoolName : threadPoolsToBlock()) {
node().injector().getInstance(ThreadPool.class).executor(threadPoolName).execute(() -> {
try {
latch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
});
}
return latch;
}

private void assertThreadPoolsBlocked() {
TimeValue timeout = TimeValue.timeValueMillis(25);
var e1 = expectThrows(
EsRejectedExecutionException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get()
ElasticsearchTimeoutException.class,
() -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get(timeout)
);
assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable"));
var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get());
assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable"));
assertThat(e1.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
var e2 = expectThrows(ElasticsearchTimeoutException.class, () -> client().prepareGet(USER_INDEX, "id").get(timeout));
assertThat(e2.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
var e3 = expectThrows(
SearchPhaseExecutionException.class,
() -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get()
ElasticsearchTimeoutException.class,
() -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(timeout)
);
assertThat(e3.getMessage(), startsWith("all shards failed"));
assertThat(e3.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task."));
}

protected void assertRunsWithThreadPoolsBlocked(Runnable runnable) {
client().admin().indices().prepareCreate(USER_INDEX).get();
CountDownLatch latch = blockThreads();
protected void runWithBlockedThreadPools(Runnable runnable) {
ThreadPool threadPool = node().injector().getInstance(ThreadPool.class);
int numThreadsToBlock = threadPoolsToBlock().stream().map(threadPool::info).mapToInt(ThreadPool.Info::getMax).sum();
CyclicBarrier cb = new CyclicBarrier(numThreadsToBlock + 1);
Runnable waitAction = () -> {
safeAwait(cb);
safeAwait(cb);
};
for (String threadPoolName : threadPoolsToBlock()) {
ThreadPool.Info info = threadPool.info(threadPoolName);
for (int i = 0; i < info.getMax(); i++) {
threadPool.executor(threadPoolName).submit(waitAction);
}
}
safeAwait(cb);
try {
assertThreadPoolsBlocked();
runnable.run();
} finally {
latch.countDown();
safeAwait(cb);
}
}

public void testUserThreadPoolsAreBlocked() {
client().admin().indices().prepareCreate(USER_INDEX).get();

runWithBlockedThreadPools(this::assertThreadPoolsBlocked);

client().admin().indices().prepareDelete(USER_INDEX).get();
}
}

0 comments on commit 574bfef

Please sign in to comment.