diff --git a/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaThreadPoolTests.java b/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaThreadPoolTests.java index f4c7317fed0f9..49cc307eeb7a5 100644 --- a/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaThreadPoolTests.java +++ b/modules/kibana/src/test/java/org/elasticsearch/kibana/KibanaThreadPoolTests.java @@ -28,7 +28,7 @@ protected Collection> getPlugins() { } public void testKibanaThreadPool() { - assertRunsWithThreadPoolsBlocked(() -> { + runWithBlockedThreadPools(() -> { // index documents String idToDelete = client().prepareIndex(".kibana").setSource(Map.of("foo", "delete me!")).get().getId(); String idToUpdate = client().prepareIndex(".kibana").setSource(Map.of("foo", "update me!")).get().getId(); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java b/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java index 68901a7dc1982..36bf7c2d67311 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/SystemIndexThreadPoolTests.java @@ -8,16 +8,16 @@ package org.elasticsearch.indices; -import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import static org.hamcrest.Matchers.startsWith; @@ -45,49 +45,54 @@ protected Set threadPoolsToBlock() { protected Settings nodeSettings() { var settingsBuilder = Settings.builder().put(super.nodeSettings()).put("node.name", "kibana-thread-pool-tests"); for (String threadPoolName : threadPoolsToBlock()) { - settingsBuilder.put("thread_pool." + threadPoolName + ".size", 1); - settingsBuilder.put("thread_pool." + threadPoolName + ".queue_size", 0); + settingsBuilder.put("thread_pool." + threadPoolName + ".queue_size", 10); } return settingsBuilder.build(); } - private CountDownLatch blockThreads() { - CountDownLatch latch = new CountDownLatch(1); - for (String threadPoolName : threadPoolsToBlock()) { - node().injector().getInstance(ThreadPool.class).executor(threadPoolName).execute(() -> { - try { - latch.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - }); - } - return latch; - } - private void assertThreadPoolsBlocked() { + TimeValue timeout = TimeValue.timeValueMillis(25); var e1 = expectThrows( - EsRejectedExecutionException.class, - () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get() + ElasticsearchTimeoutException.class, + () -> client().prepareIndex(USER_INDEX).setSource(Map.of("foo", "bar")).get(timeout) ); - assertThat(e1.getMessage(), startsWith("rejected execution of TimedRunnable")); - var e2 = expectThrows(EsRejectedExecutionException.class, () -> client().prepareGet(USER_INDEX, "id").get()); - assertThat(e2.getMessage(), startsWith("rejected execution of ActionRunnable")); + assertThat(e1.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task.")); + var e2 = expectThrows(ElasticsearchTimeoutException.class, () -> client().prepareGet(USER_INDEX, "id").get(timeout)); + assertThat(e2.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task.")); var e3 = expectThrows( - SearchPhaseExecutionException.class, - () -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get() + ElasticsearchTimeoutException.class, + () -> client().prepareSearch(USER_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(timeout) ); - assertThat(e3.getMessage(), startsWith("all shards failed")); + assertThat(e3.getMessage(), startsWith("java.util.concurrent.TimeoutException: Timeout waiting for task.")); } - protected void assertRunsWithThreadPoolsBlocked(Runnable runnable) { - client().admin().indices().prepareCreate(USER_INDEX).get(); - CountDownLatch latch = blockThreads(); + protected void runWithBlockedThreadPools(Runnable runnable) { + ThreadPool threadPool = node().injector().getInstance(ThreadPool.class); + int numThreadsToBlock = threadPoolsToBlock().stream().map(threadPool::info).mapToInt(ThreadPool.Info::getMax).sum(); + CyclicBarrier cb = new CyclicBarrier(numThreadsToBlock + 1); + Runnable waitAction = () -> { + safeAwait(cb); + safeAwait(cb); + }; + for (String threadPoolName : threadPoolsToBlock()) { + ThreadPool.Info info = threadPool.info(threadPoolName); + for (int i = 0; i < info.getMax(); i++) { + threadPool.executor(threadPoolName).submit(waitAction); + } + } + safeAwait(cb); try { - assertThreadPoolsBlocked(); runnable.run(); } finally { - latch.countDown(); + safeAwait(cb); } } + + public void testUserThreadPoolsAreBlocked() { + client().admin().indices().prepareCreate(USER_INDEX).get(); + + runWithBlockedThreadPools(this::assertThreadPoolsBlocked); + + client().admin().indices().prepareDelete(USER_INDEX).get(); + } }