Skip to content

Commit

Permalink
QL: add filtering query dsl support to IndexResolver (#60514) (#61201)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7b3635d)
  • Loading branch information
astefan authored Aug 17, 2020
1 parent 12313ca commit 0954323
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> list
if(configuration.isCancelled()){
throw new TaskCancelledException("cancelled");
}
indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), wrap(r -> {
listener.onResponse(preAnalyzer.preAnalyze(parsed, r));
}, listener::onFailure));
indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), configuration.filter(),
wrap(r -> {
listener.onResponse(preAnalyzer.preAnalyze(parsed, r));
}, listener::onFailure));
}

private LogicalPlan doParse(String eql, ParserParams params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.ql.QlIllegalArgumentException;
import org.elasticsearch.xpack.ql.type.ConstantKeywordEsField;
import org.elasticsearch.xpack.ql.type.DataType;
Expand Down Expand Up @@ -280,9 +281,9 @@ private void filterResults(String javaRegex, GetAliasesResponse aliases, GetInde
/**
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
*/
public void resolveAsMergedMapping(String indexWildcard, String javaRegex, boolean includeFrozen,
public void resolveAsMergedMapping(String indexWildcard, String javaRegex, boolean includeFrozen, QueryBuilder filter,
ActionListener<IndexResolution> listener) {
FieldCapabilitiesRequest fieldRequest = createFieldCapsRequest(indexWildcard, includeFrozen);
FieldCapabilitiesRequest fieldRequest = createFieldCapsRequest(indexWildcard, includeFrozen, filter);
client.fieldCaps(fieldRequest,
ActionListener.wrap(
response -> listener.onResponse(mergedMappings(typeRegistry, indexWildcard, response.getIndices(), response.get())),
Expand Down Expand Up @@ -458,11 +459,12 @@ private static EsField createField(DataTypeRegistry typeRegistry, String fieldNa
return new EsField(fieldName, esType, props, isAggregateable, isAlias);
}

private static FieldCapabilitiesRequest createFieldCapsRequest(String index, boolean includeFrozen) {
private static FieldCapabilitiesRequest createFieldCapsRequest(String index, boolean includeFrozen, QueryBuilder filter) {
return new FieldCapabilitiesRequest()
.indices(Strings.commaDelimitedListToStringArray(index))
.fields("*")
.includeUnmapped(true)
.indexFilter(filter)
//lenient because we throw our own errors looking at the response e.g. if something was not resolved
//also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable
.indicesOptions(includeFrozen ? FIELD_CAPS_FROZEN_INDICES_OPTIONS : FIELD_CAPS_INDICES_OPTIONS);
Expand All @@ -471,9 +473,9 @@ private static FieldCapabilitiesRequest createFieldCapsRequest(String index, boo
/**
* Resolves a pattern to multiple, separate indices. Doesn't perform validation.
*/
public void resolveAsSeparateMappings(String indexWildcard, String javaRegex, boolean includeFrozen,
public void resolveAsSeparateMappings(String indexWildcard, String javaRegex, boolean includeFrozen, QueryBuilder filter,
ActionListener<List<EsIndex>> listener) {
FieldCapabilitiesRequest fieldRequest = createFieldCapsRequest(indexWildcard, includeFrozen);
FieldCapabilitiesRequest fieldRequest = createFieldCapsRequest(indexWildcard, includeFrozen, filter);
client.fieldCaps(fieldRequest, wrap(response -> {
client.admin().indices().getAliases(createGetAliasesRequest(response, includeFrozen), wrap(aliases ->
listener.onResponse(separateMappings(typeRegistry, javaRegex, response.getIndices(), response.get(), aliases.getAliases())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ public String toString() {
}

protected void index(String... docs) throws IOException {
Request request = new Request("POST", "/test/_bulk");
indexWithIndexName("test", docs);
}

protected void indexWithIndexName(String indexName, String... docs) throws IOException {
Request request = new Request("POST", "/" + indexName + "/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
for (String doc : docs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,86 @@ public void testBasicQueryWithParameters() throws IOException {
);
}

/**
* Test for filtering the field_caps response with a filter.
* Because there is no actual SELECT involved (thus, the REST request filter not actually being applied on an actual _search), we can
* test if the filtering is correctly applied at field_caps request level.
*/
public void testSysColumnsCommandWithFilter() throws IOException {
String mode = randomMode();
// create three indices with same @timestamp date field and with differently named one more field
indexWithIndexName("test2018", "{\"@timestamp\":\"2018-06-01\",\"field2018\":\"foo\"}");
indexWithIndexName("test2019", "{\"@timestamp\":\"2019-06-01\",\"field2019\":\"foo\"}");
indexWithIndexName("test2020", "{\"@timestamp\":\"2020-06-01\",\"field2020\":\"foo\"}");

// filter the results so that only test2020's columns are displayed
Map<String, Object> actual = runSql(
new StringEntity(
query("SYS COLUMNS").mode(mode).filter("{\"range\": {\"@timestamp\": {\"gte\": \"2020\"}}}").toString(),
ContentType.APPLICATION_JSON
),
StringUtils.EMPTY,
mode
);
@SuppressWarnings("unchecked")
List<List<String>> rows = (List<List<String>>) actual.get("rows");
assertEquals(3, rows.size());
List<String> currentRow = rows.get(0);
assertEquals("test2020", currentRow.get(2));
assertEquals("@timestamp", currentRow.get(3));
currentRow = rows.get(1);
assertEquals("test2020", currentRow.get(2));
assertEquals("field2020", currentRow.get(3));
currentRow = rows.get(2);
assertEquals("test2020", currentRow.get(2));
assertEquals("field2020.keyword", currentRow.get(3));
}

/**
* Similar test with {@link #testSysColumnsCommandWithFilter()} but using "SHOW COLUMNS" command which, compared to "SYS COLUMNS"
* goes through a different calls path in IndexResolver
*/
@SuppressWarnings("unchecked")
public void testShowColumnsCommandWithFilter() throws IOException {
String mode = randomMode();
// create three indices with same @timestamp date field and with differently named one more field
indexWithIndexName("test2018", "{\"@timestamp\":\"2018-06-01\",\"field2018\":\"foo\"}");
indexWithIndexName("test2019", "{\"@timestamp\":\"2019-06-01\",\"field2019\":\"foo\"}");
indexWithIndexName("test2020", "{\"@timestamp\":\"2020-06-01\",\"field2020\":\"foo\"}");

// filter the results so that only test2020's columns are displayed
Map<String, Object> actual = runSql(
new StringEntity(
query("SHOW COLUMNS FROM test2020").mode(mode).filter("{\"range\": {\"@timestamp\": {\"gte\": \"2020\"}}}").toString(),
ContentType.APPLICATION_JSON
),
StringUtils.EMPTY,
mode
);

List<List<String>> rows = (List<List<String>>) actual.get("rows");
assertEquals(3, rows.size());
List<String> currentRow = rows.get(0);
assertEquals("@timestamp", currentRow.get(0));
currentRow = rows.get(1);
assertEquals("field2020", currentRow.get(0));
currentRow = rows.get(2);
assertEquals("field2020.keyword", currentRow.get(0));

// the second test is from an index that is filtered out by the range filter, so the result list should be empty
actual = runSql(
new StringEntity(
query("SHOW COLUMNS FROM test2019").mode(mode).filter("{\"range\": {\"@timestamp\": {\"gte\": \"2020\"}}}").toString(),
ContentType.APPLICATION_JSON
),
StringUtils.EMPTY,
mode
);

rows = (List<List<String>>) actual.get("rows");
assertTrue(rows.isEmpty());
}

public void testBasicTranslateQueryWithFilter() throws IOException {
index("{\"test\":\"foo\"}", "{\"test\":\"bar\"}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void execute(SqlSession session, ActionListener<Page> listener) {
String regex = pattern != null ? pattern.asJavaRegex() : null;

boolean withFrozen = includeFrozen || session.configuration().includeFrozen();
session.indexResolver().resolveAsMergedMapping(idx, regex, withFrozen, ActionListener.wrap(
session.indexResolver().resolveAsMergedMapping(idx, regex, withFrozen, session.configuration().filter(), ActionListener.wrap(
indexResult -> {
List<List<?>> rows = emptyList();
if (indexResult.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,25 @@ public void execute(SqlSession session, ActionListener<Page> listener) {

// special case for '%' (translated to *)
if ("*".equals(idx)) {
session.indexResolver().resolveAsSeparateMappings(idx, regex, includeFrozen, ActionListener.wrap(esIndices -> {
List<List<?>> rows = new ArrayList<>();
for (EsIndex esIndex : esIndices) {
fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode);
}

session.indexResolver().resolveAsSeparateMappings(idx, regex, includeFrozen, session.configuration().filter(),
ActionListener.wrap(esIndices -> {
List<List<?>> rows = new ArrayList<>();
for (EsIndex esIndex : esIndices) {
fillInRows(cluster, esIndex.name(), esIndex.mapping(), null, rows, columnMatcher, mode);
}
listener.onResponse(ListCursor.of(Rows.schema(output), rows, session.configuration().pageSize()));
}, listener::onFailure));
}
// otherwise use a merged mapping
else {
session.indexResolver().resolveAsMergedMapping(idx, regex, includeFrozen, ActionListener.wrap(r -> {
List<List<?>> rows = new ArrayList<>();
// populate the data only when a target is found
if (r.isValid()) {
EsIndex esIndex = r.get();
fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode);
}

session.indexResolver().resolveAsMergedMapping(idx, regex, includeFrozen, session.configuration().filter(),
ActionListener.wrap(r -> {
List<List<?>> rows = new ArrayList<>();
// populate the data only when a target is found
if (r.isValid()) {
EsIndex esIndex = r.get();
fillInRows(cluster, indexName, esIndex.mapping(), null, rows, columnMatcher, mode);
}
listener.onResponse(ListCursor.of(Rows.schema(output), rows, session.configuration().pageSize()));
}, listener::onFailure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private <T> void preAnalyze(LogicalPlan parsed, Function<IndexResolution, T> act
}

boolean includeFrozen = configuration.includeFrozen() || tableInfo.isFrozen();
indexResolver.resolveAsMergedMapping(table.index(), null, includeFrozen,
indexResolver.resolveAsMergedMapping(table.index(), null, includeFrozen, configuration.filter(),
wrap(indexResult -> listener.onResponse(action.apply(indexResult)), listener::onFailure));
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.elasticsearch.xpack.ql.index.EsIndex;
import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import org.elasticsearch.xpack.ql.index.IndexResolver.IndexInfo;
import org.elasticsearch.xpack.ql.index.IndexResolver.IndexType;
import org.elasticsearch.xpack.ql.type.EsField;
import org.elasticsearch.xpack.sql.SqlTestUtils;
import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
import org.elasticsearch.xpack.sql.parser.SqlParser;
Expand Down Expand Up @@ -55,9 +52,6 @@ public class SysColumnsTests extends ESTestCase {

private final SqlParser parser = new SqlParser();
private final Map<String, EsField> mapping = loadMapping("mapping-multi-field-with-nested.json", true);
private final IndexInfo index = new IndexInfo("test_emp", IndexType.STANDARD_INDEX);
private final IndexInfo alias = new IndexInfo("alias", IndexType.ALIAS);


public void testSysColumns() {
List<List<?>> rows = new ArrayList<>();
Expand Down Expand Up @@ -565,22 +559,18 @@ private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> para
IndexResolver resolver = mock(IndexResolver.class);
when(resolver.clusterName()).thenReturn(CLUSTER_NAME);
doAnswer(invocation -> {
((ActionListener<IndexResolution>) invocation.getArguments()[3]).onResponse(IndexResolution.valid(test));
((ActionListener<IndexResolution>) invocation.getArguments()[4]).onResponse(IndexResolution.valid(test));
return Void.TYPE;
}).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any());
}).when(resolver).resolveAsMergedMapping(any(), any(), anyBoolean(), any(), any());
doAnswer(invocation -> {
((ActionListener<List<EsIndex>>) invocation.getArguments()[3]).onResponse(singletonList(test));
((ActionListener<List<EsIndex>>) invocation.getArguments()[4]).onResponse(singletonList(test));
return Void.TYPE;
}).when(resolver).resolveAsSeparateMappings(any(), any(), anyBoolean(), any());
}).when(resolver).resolveAsSeparateMappings(any(), any(), anyBoolean(), any(), any());

SqlSession session = new SqlSession(config, null, null, resolver, null, null, null, null, null);
return new Tuple<>(cmd, session);
}

private Tuple<Command, SqlSession> sql(String sql, List<SqlTypedParamValue> params, Map<String, EsField> mapping) {
return sql(sql, params, SqlTestUtils.TEST_CFG, mapping);
}

private static void checkOdbcShortTypes(SchemaRowSet r) {
assertEquals(15, r.size());
// https:/elastic/elasticsearch/issues/35376
Expand Down

0 comments on commit 0954323

Please sign in to comment.