Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedicated threadpool for system index writes #61655

Merged
merged 11 commits into from
Sep 22, 2020
5 changes: 5 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ There are several thread pools, but the important ones include:
Thread pool type is `fixed` and a default maximum size of
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.

`system_write`::
For write operations on system indices.
Thread pool type is `fixed` and a default maximum size of
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.

Changing a specific thread pool can be done by setting its type-specific
parameters; for example, changing the number of threads in the `write` thread
pool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -394,4 +395,8 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) {
public long ramBytesUsed() {
return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum();
}

public Set<String> getIndices() {
return Collections.unmodifiableSet(indices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -81,6 +83,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
Expand Down Expand Up @@ -110,20 +113,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressure indexingPressure;
private final SystemIndices systemIndices;

@Inject
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure) {
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices) {
this(threadPool, transportService, clusterService, ingestService, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, indexingPressure, System::nanoTime);
indexNameExpressionResolver, autoCreateIndex, indexingPressure, systemIndices, System::nanoTime);
}

public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, LongSupplier relativeTimeProvider) {
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices,
LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
Expand All @@ -135,6 +140,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressure = indexingPressure;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}

Expand All @@ -158,17 +164,19 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes);
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
doInternalExecute(task, bulkRequest, releasingListener);
doInternalExecute(task, bulkRequest, executorName, releasingListener);
} catch (Exception e) {
releasingListener.onFailure(e);
}
}

protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

Expand Down Expand Up @@ -206,7 +214,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
Expand Down Expand Up @@ -255,7 +263,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
threadPool.executor(ThreadPool.Names.WRITE).execute(
threadPool.executor(executorName).execute(
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
}
}
Expand All @@ -272,10 +280,11 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
threadPool.executor(executorName).execute(() -> executeBulk(task, bulkRequest, startTime,
ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
}), responses, indicesThatCannotBeCreated));
}
}
});
Expand Down Expand Up @@ -336,6 +345,18 @@ static void prohibitCustomRoutingOnDataStream(DocWriteRequest<?> writeRequest, M
}
}

boolean isOnlySystem(BulkRequest request, SortedMap<String, IndexAbstraction> indicesLookup, SystemIndices systemIndices) {
final boolean onlySystem = request.getIndices().stream().allMatch(indexName -> {
final IndexAbstraction abstraction = indicesLookup.get(indexName);
if (abstraction != null) {
return abstraction.isSystem();
} else {
return systemIndices.isSystemIndex(indexName);
jaymode marked this conversation as resolved.
Show resolved Hide resolved
}
});
return onlySystem;
}

boolean needToCheck() {
return autoCreateIndex.needToCheck();
}
Expand Down Expand Up @@ -656,7 +677,8 @@ private long relativeTime() {
return relativeTimeProvider.getAsLong();
}

private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
private void processBulkIndexIngestRequest(Task task, BulkRequest original, String executorName,
ActionListener<BulkResponse> listener) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
Expand All @@ -681,18 +703,18 @@ private void processBulkIndexIngestRequest(Task task, BulkRequest original, Acti
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
doInternalExecute(task, bulkRequest, actionListener);
assert Thread.currentThread().getName().contains(executorName);
doInternalExecute(task, bulkRequest, executorName, actionListener);
} else {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
threadPool.executor(executorName).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
doInternalExecute(task, bulkRequest, actionListener);
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

@Override
Expand All @@ -708,7 +730,8 @@ public boolean isForceExecution() {
}
}
},
bulkRequestModifier::markItemAsDropped
bulkRequestModifier::markItemAsDropped,
executorName
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;

/** Performs shard-level bulk (index, delete or update) operations */
Expand All @@ -86,6 +89,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public static final ActionType<BulkShardResponse> TYPE = new ActionType<>(ACTION_NAME, BulkShardResponse::new);

private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
if (shard.indexSettings().getIndexMetadata().isSystem()) {
return Names.SYSTEM_WRITE;
} else {
return Names.WRITE;
}
};

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
Expand All @@ -94,9 +104,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexingPressure indexingPressure) {
IndexingPressure indexingPressure, SystemIndices systemIndices) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, indexingPressure);
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
Expand Down Expand Up @@ -136,7 +146,7 @@ public void onClusterServiceClose() {
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), listener, threadPool
}), listener, threadPool, executor(primary)
);
}

Expand All @@ -153,10 +163,11 @@ public static void performOnPrimary(
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
ThreadPool threadPool) {
ThreadPool threadPool,
String executorName) {
new ActionRunnable<>(listener) {

private final Executor executor = threadPool.executor(ThreadPool.Names.WRITE);
private final Executor executor = threadPool.executor(executorName);

private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand All @@ -33,35 +32,46 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Function;
import java.util.stream.Stream;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {

private static String ACTION_NAME = "internal:index/seq_no/resync";
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
if (shard.indexSettings().getIndexMetadata().isSystem()) {
return Names.SYSTEM_WRITE;
} else {
return Names.WRITE;
}
};

@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexingPressure indexingPressure) {
IndexingPressure indexingPressure, SystemIndices systemIndices) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressure);
indexingPressure, systemIndices);
}

@Override
Expand Down
Loading