Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed May 24, 2024
1 parent f2ea954 commit 5e276cb
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.index.mapper;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.opensearch.common.Randomness;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand All @@ -18,12 +18,13 @@
import org.opensearch.search.lookup.SourceLookup;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;

/**
* This class performs type inference by analyzing the _source documents. It uses a random sample of documents to infer the field type, similar to dynamic mapping type guessing logic.
Expand Down Expand Up @@ -57,7 +58,10 @@ public FieldTypeInference(String indexName, MapperService mapperService, IndexRe
}

public void setSampleSize(int sampleSize) {
this.sampleSize = Math.min(sampleSize, MAX_SAMPLE_SIZE_ALLOWED);
if (sampleSize > MAX_SAMPLE_SIZE_ALLOWED) {
throw new IllegalArgumentException("sample_size should be less than " + MAX_SAMPLE_SIZE_ALLOWED);
}
this.sampleSize = sampleSize;
}

public int getSampleSize() {
Expand Down Expand Up @@ -95,11 +99,8 @@ private static class RandomSourceValuesGenerator implements Iterator<List<Object
private final ValueFetcher valueFetcher;
private final IndexReader indexReader;
private final SourceLookup sourceLookup;
private final int numLeaves;
private final int[] docs;
private int iter;
private int offset;
private LeafReaderContext leafReaderContext;
private int leaf;
private final int MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES = 10000;

Expand All @@ -113,31 +114,29 @@ public RandomSourceValuesGenerator(int sampleSize, IndexReader indexReader, Valu
Math.max(sampleSize, MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES)
);
this.iter = 0;
this.offset = 0;
this.leaf = 0;
this.numLeaves = indexReader.leaves().size();
this.leaf = -1;
this.sourceLookup = new SourceLookup();
this.leafReaderContext = indexReader.leaves().get(leaf);
valueFetcher.setNextReader(leafReaderContext);
if (hasNext()) {
setNextLeaf();
}
}

@Override
public boolean hasNext() {
return iter < docs.length && leaf < numLeaves;
return iter < docs.length && leaf < indexReader.leaves().size();
}

/**
* Ensure hasNext() is called before calling next()
*/
@Override
public List<Object> next() {
int docID = docs[iter] - offset;
if (docID >= leafReaderContext.reader().numDocs()) {
int docID = docs[iter] - indexReader.leaves().get(leaf).docBase;
if (docID >= indexReader.leaves().get(leaf).reader().numDocs()) {
setNextLeaf();
return next();
}
// deleted docs are getting used to infer type, which should be okay?
sourceLookup.setSegmentAndDocument(leafReaderContext, docID);
sourceLookup.setSegmentAndDocument(indexReader.leaves().get(leaf), docs[iter] - indexReader.leaves().get(leaf).docBase);
try {
iter++;
return valueFetcher.fetchValues(sourceLookup);
Expand All @@ -147,29 +146,36 @@ public List<Object> next() {
}

private void setNextLeaf() {
offset += leafReaderContext.reader().numDocs();
leaf++;
if (leaf < numLeaves) {
leafReaderContext = indexReader.leaves().get(leaf);
valueFetcher.setNextReader(leafReaderContext);
int readerIndex = ReaderUtil.subIndex(docs[iter], indexReader.leaves());
if (readerIndex != leaf) {
leaf = readerIndex;
} else {
// this will only happen when leaves are exhausted and readerIndex will be indexReader.leaves()-1.
leaf++;

Check warning on line 154 in server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java#L154

Added line #L154 was not covered by tests
}
if (leaf < indexReader.leaves().size()) {
valueFetcher.setNextReader(indexReader.leaves().get(leaf));
}
}

private static int[] getSortedRandomNum(int sampleSize, int upperBound, int attempts) {
Set<Integer> generatedNumbers = new HashSet<>();
Set<Integer> generatedNumbers = new TreeSet<>();
Random random = Randomness.get();
int itr = 0;
while (generatedNumbers.size() < sampleSize && itr++ < attempts) {
int randomNumber = random.nextInt(upperBound);
generatedNumbers.add(randomNumber);
}
int[] result = new int[generatedNumbers.size()];
int i = 0;
for (int number : generatedNumbers) {
result[i++] = number;
if (upperBound <= 10 * sampleSize) {
List<Integer> numberList = new ArrayList<>();
for (int i = 0; i < upperBound; i++) {
numberList.add(i);
}
Collections.shuffle(numberList, random);
generatedNumbers.addAll(numberList.subList(0, sampleSize));
} else {
while (generatedNumbers.size() < sampleSize && itr++ < attempts) {
int randomNumber = random.nextInt(upperBound);
generatedNumbers.add(randomNumber);
}
}
Arrays.sort(result);
return result;
return generatedNumbers.stream().mapToInt(Integer::valueOf).toArray();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ public void testJsonSupportedTypes() throws IOException {
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
int totalDocs = 10000;
int docsPerLeafCount = 1000;
int leaves = 0;
try (Directory dir = newDirectory()) {
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document d = new Document();
for (int i = 0; i < totalDocs; i++) {
iw.addDocument(d);
if ((i + 1) % docsPerLeafCount == 0) {
iw.commit();
leaves++;
}
}
try (IndexReader reader = DirectoryReader.open(iw)) {
Expand Down Expand Up @@ -138,4 +136,75 @@ public void setNextReader(LeafReaderContext leafReaderContext) {
}
}
}

public void testDeleteAllDocs() throws IOException {
MapperService mapperService = createMapperService(topMapping(b -> {}));
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
int totalDocs = 10000;
int docsPerLeafCount = 1000;
try (Directory dir = newDirectory()) {
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document d = new Document();
for (int i = 0; i < totalDocs; i++) {
iw.addDocument(d);
if ((i + 1) % docsPerLeafCount == 0) {
iw.commit();
}
}
iw.deleteAll();
iw.commit();

try (IndexReader reader = DirectoryReader.open(iw)) {
iw.close();
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
String[] fieldName = { "text_field" };
Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
assertNull(mapper);
}
}
}

public void testZeroDoc() throws IOException {
MapperService mapperService = createMapperService(topMapping(b -> {}));
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
try (Directory dir = newDirectory()) {
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
try (IndexReader reader = DirectoryReader.open(iw)) {
iw.close();
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
String[] fieldName = { "text_field" };
Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0]));
assertNull(mapper);
}
}
}

public void testSampleGeneration() throws IOException {
MapperService mapperService = createMapperService(topMapping(b -> {}));
QueryShardContext queryShardContext = createQueryShardContext(mapperService);
when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid"));
int totalDocs = 10000;
int docsPerLeafCount = 1000;
try (Directory dir = newDirectory()) {
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
Document d = new Document();
for (int i = 0; i < totalDocs; i++) {
iw.addDocument(d);
if ((i + 1) % docsPerLeafCount == 0) {
iw.commit();
}
}
try (IndexReader reader = DirectoryReader.open(iw)) {
iw.close();
FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader);
typeInference.setSampleSize(1000 - 1);
typeInference.infer(lookup -> documentMap.get("unknown_field"));
assertThrows(IllegalArgumentException.class, () -> typeInference.setSampleSize(1000 + 1));
typeInference.setSampleSize(1000);
typeInference.infer(lookup -> documentMap.get("unknown_field"));
}
}
}
}

0 comments on commit 5e276cb

Please sign in to comment.