Skip to content

Commit

Permalink
[ML] Add incremental id during data frame analytics reindexing (#61943)
Browse files Browse the repository at this point in the history
Previously, we added a copy of the `_id` during reindexing and sorted
the destination index on that. This allowed us to traverse the docs in the
destination index in a stable order multiple times and with efficiency.
However, the destination index being sorted means we cannot have `nested`
typed fields. This is a problem as it does not allow us to provide
a good experience with our evaluate API when it comes to computing
metrics for specific classes, features, etc.

This commit changes the approach in order to result to a destination
index that allows nested fields.

Instead of adding a copy of the `_id` field, we now add an incremental
id that we can use to traverse the docs in a stable order. We also
ensure we always assign the same incremental id to the same doc from
the source indices by sorting on `_seq_no` during reindexing. That
in combination with the reindexing API using scroll gives us a stable
order as scroll uses the (`_index`, `_doc`, shard_id) tuple to resolve ties.

The extractor now does not need to scroll. Instead we sort on the incremental
id and we do ranged searches to avoid the sort-all-docs overhead.

Finally, the `TestDocsIterator` is simply changed to search_after the incremental id.

With these changes data frame analytics jobs do not use scroll at any part.

Having all these in place, the commit adds the `nested` types to the necessary
fields of `classification` and `regression` analyses results.
  • Loading branch information
dimitris-athanasiou authored Sep 4, 2020
1 parent bcd6706 commit 5d1be25
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.FieldAliasMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.LenientlyParsedPreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.StrictlyParsedPreProcessor;
Expand Down Expand Up @@ -123,6 +126,30 @@ public static Classification fromXContent(XContentParser parser, boolean ignoreU
)
);

static final Map<String, Object> FEATURE_IMPORTANCE_MAPPING;
static {
Map<String, Object> classesProperties = new HashMap<>();
classesProperties.put("class_name", Collections.singletonMap("type", KeywordFieldMapper.CONTENT_TYPE));
classesProperties.put("importance", Collections.singletonMap("type", NumberFieldMapper.NumberType.DOUBLE.typeName()));

Map<String, Object> classesMapping = new HashMap<>();
classesMapping.put("dynamic", false);
classesMapping.put("type", ObjectMapper.NESTED_CONTENT_TYPE);
classesMapping.put("properties", classesProperties);

Map<String, Object> properties = new HashMap<>();
properties.put("feature_name", Collections.singletonMap("type", KeywordFieldMapper.CONTENT_TYPE));
properties.put("importance", Collections.singletonMap("type", NumberFieldMapper.NumberType.DOUBLE.typeName()));
properties.put("classes", classesMapping);

Map<String, Object> mapping = new HashMap<>();
mapping.put("dynamic", false);
mapping.put("type", ObjectMapper.NESTED_CONTENT_TYPE);
mapping.put("properties", properties);

FEATURE_IMPORTANCE_MAPPING = Collections.unmodifiableMap(mapping);
}

private final String dependentVariable;
private final BoostedTreeParams boostedTreeParams;
private final String predictionFieldName;
Expand Down Expand Up @@ -347,7 +374,7 @@ public List<FieldCardinalityConstraint> getFieldCardinalityConstraints() {
@Override
public Map<String, Object> getExplicitlyMappedFields(Map<String, Object> mappingsProperties, String resultsFieldName) {
Map<String, Object> additionalProperties = new HashMap<>();
additionalProperties.put(resultsFieldName + ".feature_importance", MapUtils.classificationFeatureImportanceMapping());
additionalProperties.put(resultsFieldName + ".feature_importance", FEATURE_IMPORTANCE_MAPPING);
Object dependentVariableMapping = extractMapping(dependentVariable, mappingsProperties);
if ((dependentVariableMapping instanceof Map) == false) {
return additionalProperties;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.LenientlyParsedPreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.StrictlyParsedPreProcessor;
Expand Down Expand Up @@ -97,6 +99,20 @@ public static Regression fromXContent(XContentParser parser, boolean ignoreUnkno
)
);

static final Map<String, Object> FEATURE_IMPORTANCE_MAPPING;
static {
Map<String, Object> properties = new HashMap<>();
properties.put("feature_name", Collections.singletonMap("type", KeywordFieldMapper.CONTENT_TYPE));
properties.put("importance", Collections.singletonMap("type", NumberFieldMapper.NumberType.DOUBLE.typeName()));

Map<String, Object> mapping = new HashMap<>();
mapping.put("dynamic", false);
mapping.put("type", ObjectMapper.NESTED_CONTENT_TYPE);
mapping.put("properties", properties);

FEATURE_IMPORTANCE_MAPPING = Collections.unmodifiableMap(mapping);
}

private final String dependentVariable;
private final BoostedTreeParams boostedTreeParams;
private final String predictionFieldName;
Expand Down Expand Up @@ -269,7 +285,7 @@ public List<FieldCardinalityConstraint> getFieldCardinalityConstraints() {
@Override
public Map<String, Object> getExplicitlyMappedFields(Map<String, Object> mappingsProperties, String resultsFieldName) {
Map<String, Object> additionalProperties = new HashMap<>();
additionalProperties.put(resultsFieldName + ".feature_importance", MapUtils.regressionFeatureImportanceMapping());
additionalProperties.put(resultsFieldName + ".feature_importance", FEATURE_IMPORTANCE_MAPPING);
// Prediction field should be always mapped as "double" rather than "float" in order to increase precision in case of
// high (over 10M) values of dependent variable.
additionalProperties.put(resultsFieldName + "." + predictionFieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,20 @@ public void testFieldCardinalityLimitsIsNonEmpty() {

public void testGetExplicitlyMappedFields() {
assertThat(new Classification("foo").getExplicitlyMappedFields(null, "results"),
equalTo(Collections.singletonMap("results.feature_importance", MapUtils.classificationFeatureImportanceMapping())));
equalTo(Collections.singletonMap("results.feature_importance", Classification.FEATURE_IMPORTANCE_MAPPING)));
assertThat(new Classification("foo").getExplicitlyMappedFields(Collections.emptyMap(), "results"),
equalTo(Collections.singletonMap("results.feature_importance", MapUtils.classificationFeatureImportanceMapping())));
equalTo(Collections.singletonMap("results.feature_importance", Classification.FEATURE_IMPORTANCE_MAPPING)));
assertThat(
new Classification("foo").getExplicitlyMappedFields(Collections.singletonMap("foo", "not_a_map"), "results"),
equalTo(Collections.singletonMap("results.feature_importance", MapUtils.classificationFeatureImportanceMapping())));
equalTo(Collections.singletonMap("results.feature_importance", Classification.FEATURE_IMPORTANCE_MAPPING)));
Map<String, Object> explicitlyMappedFields = new Classification("foo").getExplicitlyMappedFields(
Collections.singletonMap("foo", Collections.singletonMap("bar", "baz")),
"results");
assertThat(explicitlyMappedFields,
allOf(
hasEntry("results.foo_prediction", Collections.singletonMap("bar", "baz")),
hasEntry("results.top_classes.class_name", Collections.singletonMap("bar", "baz"))));
assertThat(explicitlyMappedFields, hasEntry("results.feature_importance", MapUtils.classificationFeatureImportanceMapping()));
assertThat(explicitlyMappedFields, hasEntry("results.feature_importance", Classification.FEATURE_IMPORTANCE_MAPPING));

explicitlyMappedFields = new Classification("foo").getExplicitlyMappedFields(
new HashMap<>() {{
Expand All @@ -380,7 +380,7 @@ public void testGetExplicitlyMappedFields() {
allOf(
hasEntry("results.foo_prediction", Collections.singletonMap("type", "long")),
hasEntry("results.top_classes.class_name", Collections.singletonMap("type", "long"))));
assertThat(explicitlyMappedFields, hasEntry("results.feature_importance", MapUtils.classificationFeatureImportanceMapping()));
assertThat(explicitlyMappedFields, hasEntry("results.feature_importance", Classification.FEATURE_IMPORTANCE_MAPPING));

assertThat(
new Classification("foo").getExplicitlyMappedFields(
Expand All @@ -389,7 +389,7 @@ public void testGetExplicitlyMappedFields() {
put("path", "missing");
}}),
"results"),
equalTo(Collections.singletonMap("results.feature_importance", MapUtils.classificationFeatureImportanceMapping())));
equalTo(Collections.singletonMap("results.feature_importance", Classification.FEATURE_IMPORTANCE_MAPPING)));
}

public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void testFieldCardinalityLimitsIsEmpty() {
public void testGetExplicitlyMappedFields() {
Map<String, Object> explicitlyMappedFields = new Regression("foo").getExplicitlyMappedFields(null, "results");
assertThat(explicitlyMappedFields, hasEntry("results.foo_prediction", Collections.singletonMap("type", "double")));
assertThat(explicitlyMappedFields, hasEntry("results.feature_importance", MapUtils.regressionFeatureImportanceMapping()));
assertThat(explicitlyMappedFields, hasEntry("results.feature_importance", Regression.FEATURE_IMPORTANCE_MAPPING));
}

public void testGetStateDocId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.core.ClientHelper;
Expand All @@ -49,6 +51,9 @@
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand Down Expand Up @@ -263,11 +268,27 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
reindexRequest.setRefresh(true);
reindexRequest.setSourceIndices(config.getSource().getIndex());
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
reindexRequest.getSearchRequest().source().sort(SeqNoFieldMapper.NAME, SortOrder.ASC);
reindexRequest.setDestIndex(config.getDest().getIndex());
reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));

// We explicitly set slices to 1 as we cannot parallelize in order to have the incremental id
reindexRequest.setSlices(1);
Map<String, Object> counterValueParam = new HashMap<>();
counterValueParam.put("value", -1);
reindexRequest.setScript(
new Script(
Script.DEFAULT_SCRIPT_TYPE,
Script.DEFAULT_SCRIPT_LANG,
// We use indirection here because top level params are immutable.
// This is a work around at the moment but the plan is to make this a feature of reindex API.
"ctx._source." + DestinationIndex.INCREMENTAL_ID + " = ++params.counter.value",
Collections.singletonMap("counter", counterValueParam)
)
);

reindexRequest.setParentTask(task.getParentTaskId());
reindexRequest.getSearchRequest().allowPartialSearchResults(false);

final ThreadContext threadContext = parentTaskClient.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
Expand All @@ -46,7 +44,7 @@
*/
public final class DestinationIndex {

public static final String ID_COPY = "ml__id_copy";
public static final String INCREMENTAL_ID = "ml__incremental_id";

/**
* The field that indicates whether a doc was used for training or not
Expand Down Expand Up @@ -136,8 +134,6 @@ private static Settings settings(GetSettingsResponse settingsResponse) {
Integer maxNumberOfReplicas = findMaxSettingValue(settingsResponse, IndexMetadata.SETTING_NUMBER_OF_REPLICAS);

Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), ID_COPY);
settingsBuilder.put(IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(), SortOrder.ASC);
if (maxNumberOfShards != null) {
settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, maxNumberOfShards);
}
Expand All @@ -163,7 +159,7 @@ private static Integer findMaxSettingValue(GetSettingsResponse settingsResponse,

private static Map<String, Object> createAdditionalMappings(DataFrameAnalyticsConfig config, Map<String, Object> mappingsProperties) {
Map<String, Object> properties = new HashMap<>();
properties.put(ID_COPY, Map.of("type", KeywordFieldMapper.CONTENT_TYPE));
properties.put(INCREMENTAL_ID, Map.of("type", NumberFieldMapper.NumberType.LONG.typeName()));
properties.putAll(config.getAnalysis().getExplicitlyMappedFields(mappingsProperties, config.getDest().getResultsField()));
return properties;
}
Expand Down
Loading

0 comments on commit 5d1be25

Please sign in to comment.