Skip to content

Commit

Permalink
For sort request on timeseries field use non concurrent search path
Browse files Browse the repository at this point in the history
Signed-off-by: Sorabh Hamirwasia <[email protected]>
  • Loading branch information
sohami committed Aug 25, 2023
1 parent 8cfde6c commit 691c4ed
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https:/opensearch-project/OpenSearch/pull/9528)))
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https:/opensearch-project/OpenSearch/pull/9469))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https:/opensearch-project/OpenSearch/pull/9448/))
- Use non-concurrent path for sort request on timeseries index and field()

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.Version;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -890,11 +891,11 @@ public boolean shouldUseConcurrentSearch() {
* Evaluate if parsed request supports concurrent segment search
*/
public void evaluateRequestShouldUseConcurrentSearch() {
boolean useConcurrentSearch = !isSortOnTimeSeriesField();
if (aggregations() != null && aggregations().factories() != null) {
requestShouldUseConcurrentSearch.set(aggregations().factories().allFactoriesSupportConcurrentSearch());
} else {
requestShouldUseConcurrentSearch.set(true);
useConcurrentSearch = useConcurrentSearch && aggregations().factories().allFactoriesSupportConcurrentSearch();
}
requestShouldUseConcurrentSearch.set(useConcurrentSearch);
}

public void setProfilers(Profilers profilers) {
Expand Down Expand Up @@ -965,4 +966,14 @@ public int getTargetMaxSliceCount() {
}
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
}

@Override
public boolean isSortOnTimeSeriesField() {
return sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CombinedBitSet;
import org.apache.lucene.util.SparseFixedBitSet;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.search.DocValueFormat;
Expand Down Expand Up @@ -522,19 +521,9 @@ private boolean shouldReverseLeafReaderContexts() {
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
// Only reverse order for asc order sort queries
if (searchContext.sort() != null
&& searchContext.sort().sort != null
&& searchContext.sort().sort.getSort() != null
&& searchContext.sort().sort.getSort().length > 0
&& searchContext.sort().sort.getSort()[0].getReverse() == false
&& searchContext.sort().sort.getSort()[0].getField() != null
&& searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
return true;
}
}
return false;
return searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()
&& searchContext.isSortOnTimeSeriesField()
&& searchContext.sort().sort.getSort()[0].getReverse() == false;
}

// package-private for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,9 @@ public boolean shouldUseConcurrentSearch() {
public int getTargetMaxSliceCount() {
return in.getTargetMaxSliceCount();
}

@Override
public boolean isSortOnTimeSeriesField() {
return in.isSortOnTimeSeriesField();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,9 @@ public String toString() {
public abstract BucketCollectorProcessor bucketCollectorProcessor();

public abstract int getTargetMaxSliceCount();

/**
* @return true: if sort is on timestamp field, false: otherwise
*/
public abstract boolean isSortOnTimeSeriesField();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ public boolean searchWith(
boolean hasTimeout
) throws IOException {
if (searchContext.shouldUseConcurrentSearch()) {
LOGGER.info("Using concurrent search over segments (experimental)");
LOGGER.debug("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
LOGGER.debug("Using non-concurrent search over segments");
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}
Expand All @@ -73,9 +74,10 @@ public boolean searchWith(
@Override
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
if (searchContext.shouldUseConcurrentSearch()) {
LOGGER.info("Using concurrent search over segments (experimental)");
LOGGER.debug("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
} else {
LOGGER.debug("Using non-concurrent search over segments");
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,21 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.opensearch.Version;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
Expand All @@ -75,13 +80,15 @@
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -547,6 +554,159 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
}
}

public void testSearchPathEvaluationUsingSortField() throws Exception {
// enable the concurrent set FeatureFlag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
when(shardSearchRequest.shardId()).thenReturn(shardId);

ThreadPool threadPool = new TestThreadPool(this.getClass().getName());
IndexShard indexShard = mock(IndexShard.class);
QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class);
when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
when(indexShard.getThreadPool()).thenReturn(threadPool);

Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.build();

IndexService indexService = mock(IndexService.class);
QueryShardContext queryShardContext = mock(QueryShardContext.class);
when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn(
queryShardContext
);

IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
when(indexService.getIndexSettings()).thenReturn(indexSettings);

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());

try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {

final Supplier<Engine.SearcherSupplier> searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) {
@Override
protected void doClose() {}

@Override
protected Engine.Searcher acquireSearcherInternal(String source) {
try {
IndexReader reader = w.getReader();
return new Engine.Searcher(
"test",
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
reader
);
} catch (IOException exc) {
throw new AssertionError(exc);
}
}
};

SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
ReaderContext readerContext = new ReaderContext(
newContextId(),
indexService,
indexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
);

final ClusterService clusterService = mock(ClusterService.class);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.EMPTY_SET);
clusterSettings.registerSetting(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING);
clusterSettings.applySettings(
Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build()
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
DefaultSearchContext context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);

// Case1: if sort is on timestamp field, non-concurrent path is used
context.sort(
new SortAndFormats(new Sort(new SortField("@timestamp", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW })
);
context.evaluateRequestShouldUseConcurrentSearch();
assertFalse(context.shouldUseConcurrentSearch());
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case2: if sort is on other field, concurrent path is used
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
clusterService,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.sort(
new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW })
);
context.evaluateRequestShouldUseConcurrentSearch();
if (executor == null) {
assertFalse(context.shouldUseConcurrentSearch());
} else {
assertTrue(context.shouldUseConcurrentSearch());
}
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case 3: With no sort, concurrent path is used
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
clusterService,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
if (executor == null) {
assertFalse(context.shouldUseConcurrentSearch());
} else {
assertTrue(context.shouldUseConcurrentSearch());
}
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// shutdown the threadpool
threadPool.shutdown();
}
}

private ShardSearchContextId newContextId() {
return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -692,6 +693,16 @@ public int getTargetMaxSliceCount() {
return maxSliceCount;
}

@Override
public boolean isSortOnTimeSeriesField() {
return sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME);
}

/**
* Clean the query results by consuming all of it
*/
Expand Down

0 comments on commit 691c4ed

Please sign in to comment.