Skip to content

Commit

Permalink
Expose timestamp field type on coordinator node (elastic#65873)
Browse files Browse the repository at this point in the history
Today a coordinating node does not have (easy) access to the mappings
for the indices for the searches it wishes to coordinate. This means it
can't properly interpret a timestamp range filter in a query and must
involve a copy of every shard in at least the `can_match` phase. It
therefore cannot cope with cases when shards are temporarily not started
even if those shards are irrelevant to the search.

This commit captures the mapping of the `@timestamp` field for indices
which expose a timestamp range in their index metadata.
  • Loading branch information
DaveCTurner committed Dec 4, 2020
1 parent 19bee5a commit a0e5f9b
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 0 deletions.
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
Expand Down Expand Up @@ -233,11 +234,15 @@ public class IndicesService extends AbstractLifecycleComponent
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final TimestampFieldMapperService timestampFieldMapperService;

@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);

// Start watching for timestamp fields
clusterService.addStateApplier(timestampFieldMapperService);
}

public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry,
Expand Down Expand Up @@ -329,6 +334,8 @@ protected void closeInternal() {

this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);

this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this);
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand All @@ -339,6 +346,9 @@ public ClusterService clusterService() {

@Override
protected void doStop() {
clusterService.removeApplier(timestampFieldMapperService);
timestampFieldMapperService.doStop();

ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);

ExecutorService indicesStopExecutor =
Expand Down Expand Up @@ -1605,4 +1615,16 @@ public boolean allPendingDanglingIndicesWritten() {
return nodeWriteDanglingIndicesInfo == false ||
(danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0);
}

/**
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
* - the index is not found,
* - the field is not found, or
* - the field is not a timestamp field.
*/
@Nullable
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
return timestampFieldMapperService.getTimestampFieldType(index);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* Tracks the mapping of the {@code @timestamp} field of immutable indices that expose their timestamp range in their index metadata.
* Coordinating nodes do not have (easy) access to mappings for all indices, so we extract the type of this one field from the mapping here.
*/
public class TimestampFieldMapperService extends AbstractLifecycleComponent implements ClusterStateApplier {

private static final Logger logger = LogManager.getLogger(TimestampFieldMapperService.class);

private final IndicesService indicesService;
private final ExecutorService executor; // single thread to construct mapper services async as needed

/**
* The type of the {@code @timestamp} field keyed by index. Futures may be completed with {@code null} to indicate that there is
* no usable {@code @timestamp} field.
*/
private final Map<Index, PlainActionFuture<DateFieldMapper.DateFieldType>> fieldTypesByIndex = ConcurrentCollections.newConcurrentMap();

public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
this.indicesService = indicesService;

final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
final String threadName = "TimestampFieldMapperService#updateTask";
executor = EsExecutors.newScaling(nodeName + "/" + threadName, 0, 1, 0, TimeUnit.MILLISECONDS,
daemonThreadFactory(nodeName, threadName), threadPool.getThreadContext());
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

@Override
protected void doClose() {
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
final Metadata metadata = event.state().metadata();

// clear out mappers for indices that no longer exist or whose timestamp range is no longer known
fieldTypesByIndex.keySet().removeIf(index -> hasUsefulTimestampField(metadata.index(index)) == false);

// capture mappers for indices that do exist
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
final IndexMetadata indexMetadata = cursor.value;
final Index index = indexMetadata.getIndex();

if (hasUsefulTimestampField(indexMetadata) && fieldTypesByIndex.containsKey(index) == false) {
logger.trace("computing timestamp mapping for {}", index);
final PlainActionFuture<DateFieldMapper.DateFieldType> future = new PlainActionFuture<>();
fieldTypesByIndex.put(index, future);

final IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.trace("computing timestamp mapping for {} async", index);
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to compute mapping for {}", index), e);
future.onResponse(null); // no need to propagate a failure to create the mapper service to searches
}

@Override
protected void doRun() throws Exception {
try (MapperService mapperService = indicesService.createIndexMapperService(indexMetadata)) {
mapperService.merge(indexMetadata, MapperService.MergeReason.MAPPING_RECOVERY);
future.onResponse(fromMapperService(mapperService));
}
}
});
} else {
logger.trace("computing timestamp mapping for {} using existing index service", index);
try {
future.onResponse(fromMapperService(indexService.mapperService()));
} catch (Exception e) {
assert false : e;
future.onResponse(null);
}
}
}
}
}

private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) {
if (indexMetadata == null) {
return false;
}
final IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange();
return timestampMillisRange.isComplete() && timestampMillisRange != IndexLongFieldRange.UNKNOWN;
}

private static DateFieldMapper.DateFieldType fromMapperService(MapperService mapperService) {
final MappedFieldType mappedFieldType = mapperService.fieldType(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
if (mappedFieldType instanceof DateFieldMapper.DateFieldType) {
return (DateFieldMapper.DateFieldType) mappedFieldType;
} else {
return null;
}
}

/**
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
* - the index is not found,
* - the field is not found,
* - the mapping is not known yet, or
* - the field is not a timestamp field.
*/
@Nullable
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
final PlainActionFuture<DateFieldMapper.DateFieldType> future = fieldTypesByIndex.get(index);
if (future == null || future.isDone() == false) {
return null;
}
return future.actionGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -29,6 +33,7 @@
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -97,4 +102,72 @@ public void testTimestampRangeRecalculatedOnStalePrimaryAllocation() throws IOEx
assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis()));
}

public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception {
internalCluster().startNodes(between(2, 4));

final String locale;
final String date;

switch (between(1, 3)) {
case 1:
locale = "";
date = "04 Feb 2020 12:01:23Z";
break;
case 2:
locale = "en_GB";
date = "04 Feb 2020 12:01:23Z";
break;
case 3:
locale = "fr_FR";
date = "04 févr. 2020 12:01:23Z";
break;
default:
throw new AssertionError("impossible");
}

assertAcked(prepareCreate("index")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.addMapping("_doc", jsonBuilder().startObject().startObject("_doc").startObject("properties")
.startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD)
.field("type", "date")
.field("format", "dd LLL yyyy HH:mm:ssX")
.field("locale", locale)
.endObject()
.endObject().endObject().endObject()));

final Index index = client().admin().cluster().prepareState().clear().setIndices("index").setMetadata(true)
.get().getState().metadata().index("index").getIndex();

ensureGreen("index");
if (randomBoolean()) {
client().prepareIndex("index", "_doc").setSource(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, date).get();
}

for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
assertNull(indicesService.getTimestampFieldType(index));
}

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet());
ensureGreen("index");
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
final PlainActionFuture<DateFieldMapper.DateFieldType> timestampFieldTypeFuture = new PlainActionFuture<>();
assertBusy(() -> {
final DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(index);
assertNotNull(timestampFieldType);
timestampFieldTypeFuture.onResponse(timestampFieldType);
});
assertTrue(timestampFieldTypeFuture.isDone());
assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().locale().toString(), equalTo(locale));
assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().parseMillis(date), equalTo(1580817683000L));
}

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index").setFreeze(false)).actionGet());
ensureGreen("index");
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
assertNull(indicesService.getTimestampFieldType(index));
}
}

}

0 comments on commit a0e5f9b

Please sign in to comment.