Skip to content

Commit

Permalink
Block all relevant threads on any number of nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
williamrandolph committed Mar 30, 2024
1 parent 5335b14 commit 5e6ef15
Showing 1 changed file with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Phaser;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.startsWith;
Expand All @@ -31,7 +31,6 @@
* <p>When implementing this class, don't forget to override {@link ESIntegTestCase#nodePlugins()} if
* the relevant system index is defined in a plugin.</p>
*/
@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0)
public abstract class SystemIndexThreadPoolTests extends ESIntegTestCase {

private static final String USER_INDEX = "user_index";
Expand Down Expand Up @@ -60,24 +59,28 @@ private void assertThreadPoolsBlocked() {
}

protected void runWithBlockedThreadPools(Runnable runnable) {
ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);
int numThreadsToBlock = threadPoolsToBlock().stream().map(threadPool::info).mapToInt(ThreadPool.Info::getMax).sum();
CyclicBarrier cb = new CyclicBarrier(numThreadsToBlock + 1);
Phaser phaser = new Phaser();
Runnable waitAction = () -> {
safeAwait(cb);
safeAwait(cb);
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
};
for (String threadPoolName : threadPoolsToBlock()) {
ThreadPool.Info info = threadPool.info(threadPoolName);
for (int i = 0; i < info.getMax(); i++) {
threadPool.executor(threadPoolName).submit(waitAction);
phaser.register(); // register this test's thread

for (String nodeName : internalCluster().getNodeNames()) {
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, nodeName);
for (String threadPoolName : threadPoolsToBlock()) {
ThreadPool.Info info = threadPool.info(threadPoolName);
phaser.bulkRegister(info.getMax());
for (int i = 0; i < info.getMax(); i++) {
threadPool.executor(threadPoolName).submit(waitAction);
}
}
}
safeAwait(cb);
phaser.arriveAndAwaitAdvance();
try {
runnable.run();
} finally {
safeAwait(cb);
phaser.arriveAndAwaitAdvance();
}
}

Expand Down

0 comments on commit 5e6ef15

Please sign in to comment.