diff --git a/server/src/main/java/org/opensearch/common/util/ByteArrayBackedBitset.java b/server/src/main/java/org/opensearch/common/util/ByteArrayBackedBitset.java index 2d7948d414937..a0c14ac8e9116 100644 --- a/server/src/main/java/org/opensearch/common/util/ByteArrayBackedBitset.java +++ b/server/src/main/java/org/opensearch/common/util/ByteArrayBackedBitset.java @@ -8,11 +8,10 @@ package org.opensearch.common.util; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RandomAccessInput; import java.io.IOException; +import java.nio.ByteBuffer; /** * A bitset backed by a byte array. This will initialize and set bits in the byte array based on the index. @@ -39,18 +38,6 @@ public ByteArrayBackedBitset(RandomAccessInput in, long offset, int length) thro } } - /** - * Constructor which set the Lucene's IndexInput to read the bitset into a read-only buffer. - */ - public ByteArrayBackedBitset(IndexInput in, int length) throws IOException { - byteArray = new byte[length]; - int i = 0; - while (i < length) { - byteArray[i] = in.readByte(); - i++; - } - } - /** * Sets the bit at the given index to 1. * Each byte can indicate 8 bits, so the index is divided by 8 to get the byte array index. @@ -61,10 +48,10 @@ public void set(int index) { byteArray[byteArrIndex] |= (byte) (1 << (index & 7)); } - public int write(IndexOutput output) throws IOException { + public int write(ByteBuffer output) throws IOException { int numBytes = 0; for (Byte bitSet : byteArray) { - output.writeByte(bitSet); + output.put(bitSet); numBytes += Byte.BYTES; } return numBytes; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java index 3f4e302e0a0f2..e91505d180105 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractDocumentsFileManager.java @@ -16,6 +16,8 @@ import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.NumericUtils; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; @@ -24,6 +26,8 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE; @@ -67,54 +71,84 @@ private void setDocSizeInBytes(int numBytes) { } /** - * Write the star tree document to file associated with dimensions and metrics + * Write the star tree document to a byte buffer */ protected int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException { - int numBytes = writeDimensions(starTreeDocument, output); - numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc); + int numBytes = calculateDocumentSize(starTreeDocument, isAggregatedDoc); + byte[] bytes = new byte[numBytes]; + ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()); + writeDimensions(starTreeDocument, buffer); + if (isAggregatedDoc == false) { + writeFlushMetrics(starTreeDocument, buffer); + } else { + writeMetrics(starTreeDocument, buffer, isAggregatedDoc); + } + output.writeBytes(bytes, bytes.length); setDocSizeInBytes(numBytes); - return numBytes; + return bytes.length; } /** - * Write dimensions to file + * Write dimensions to the byte buffer */ - protected int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException { - int numBytes = 0; - for (int i = 0; i < starTreeDocument.dimensions.length; i++) { - output.writeLong(starTreeDocument.dimensions[i] == null ? 0L : starTreeDocument.dimensions[i]); - numBytes += Long.BYTES; + protected void writeDimensions(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException { + for (Long dimension : starTreeDocument.dimensions) { + buffer.putLong(dimension == null ? 0L : dimension); } - numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, output); - return numBytes; + StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, buffer); } /** * Write star tree document metrics to file */ - protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException { - int numBytes = 0; + protected void writeFlushMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException { + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]); + } + StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer); + } + + /** + * Write star tree document metrics to the byte buffer + */ + protected void writeMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer, boolean isAggregatedDoc) throws IOException { for (int i = 0; i < starTreeDocument.metrics.length; i++) { FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType(); if (aggregatedValueType.equals(LONG)) { - output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]); - numBytes += Long.BYTES; + buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]); } else if (aggregatedValueType.equals(DOUBLE)) { if (isAggregatedDoc) { long val = NumericUtils.doubleToSortableLong( starTreeDocument.metrics[i] == null ? 0.0 : (Double) starTreeDocument.metrics[i] ); - output.writeLong(val); + buffer.putLong(val); } else { - output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]); + buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]); } - numBytes += Long.BYTES; } else { throw new IllegalStateException("Unsupported metric type"); } } - numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output); - return numBytes; + StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer); + } + + /** + * Calculate the size of the serialized StarTreeDocument + */ + private int calculateDocumentSize(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) { + int size = starTreeDocument.dimensions.length * Long.BYTES; + size += getLength(starTreeDocument.dimensions); + + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + size += Long.BYTES; + } + size += getLength(starTreeDocument.metrics); + + return size; + } + + private static int getLength(Object[] array) { + return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1); } /** @@ -132,7 +166,11 @@ protected StarTreeDocument readStarTreeDocument(RandomAccessInput input, long of offset = readDimensions(dimensions, input, offset); Object[] metrics = new Object[numMetrics]; - offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc); + if (isAggregatedDoc == false) { + offset = readMetrics(input, offset, metrics); + } else { + offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc); + } assert (offset - initialOffset) == docSizeInBytes; return new StarTreeDocument(dimensions, metrics); } @@ -154,10 +192,32 @@ protected long readDimensions(Long[] dimensions, RandomAccessInput input, long o return offset; } + /** + * Read metrics based on metric field values. Then we reuse the metric field values to each of the metric stats. + */ + private long readMetrics(RandomAccessInput input, long offset, Object[] metrics) throws IOException { + Object[] fieldMetrics = new Object[starTreeField.getMetrics().size()]; + for (int i = 0; i < starTreeField.getMetrics().size(); i++) { + fieldMetrics[i] = input.readLong(offset); + offset += Long.BYTES; + } + offset += StarTreeDocumentBitSetUtil.readBitSet(input, offset, fieldMetrics, index -> null); + int fieldIndex = 0; + int numMetrics = 0; + for (Metric metric : starTreeField.getMetrics()) { + for (MetricStat stat : metric.getBaseMetrics()) { + metrics[numMetrics] = fieldMetrics[fieldIndex]; + numMetrics++; + } + fieldIndex++; + } + return offset; + } + /** * Read star tree metrics from file */ - protected long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc) + private long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc) throws IOException { for (int i = 0; i < numMetrics; i++) { FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType(); diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index 2d4938eeb45b3..c3ea04d52e892 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -183,6 +183,7 @@ public List generateMetricAggregatorInfos(MapperService ma /** * Generates the configuration required to perform aggregation for all the metrics on a field + * Each metric field is associated with a metric reader * * @return list of MetricAggregatorInfo */ @@ -191,24 +192,20 @@ public List getMetricReaders(SegmentWriteState stat List metricReaders = new ArrayList<>(); for (Metric metric : this.starTreeField.getMetrics()) { - for (MetricStat metricStat : metric.getBaseMetrics()) { - SequentialDocValuesIterator metricReader; - FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); - if (metricStat.equals(MetricStat.DOC_COUNT)) { - // _doc_count is numeric field , so we convert to sortedNumericDocValues and get iterator - metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME); - } else { - if (metricFieldInfo == null) { - metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC); - } - metricReader = new SequentialDocValuesIterator( - new SortedNumericStarTreeValuesIterator( - fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) - ) - ); + SequentialDocValuesIterator metricReader; + FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); + if (metric.getField().equals(DocCountFieldMapper.NAME)) { + metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME); + } else { + if (metric.getBaseMetrics().isEmpty()) continue; + if (metricFieldInfo == null) { + metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC); } - metricReaders.add(metricReader); + metricReader = new SequentialDocValuesIterator( + new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)) + ); } + metricReaders.add(metricReader); } return metricReaders; } @@ -572,11 +569,20 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte */ private Object[] getStarTreeMetricsFromSegment(int currentDocId, List metricsReaders) throws IOException { Object[] metrics = new Object[numMetrics]; - for (int i = 0; i < numMetrics; i++) { - SequentialDocValuesIterator metricStatReader = metricsReaders.get(i); - if (metricStatReader != null) { + int metricIndex = 0; + for (int i = 0; i < starTreeField.getMetrics().size(); i++) { + Metric metric = starTreeField.getMetrics().get(i); + if (metric.getBaseMetrics().isEmpty()) continue; + SequentialDocValuesIterator metricReader = metricsReaders.get(i); + if (metricReader != null) { try { - metricStatReader.nextEntry(currentDocId); + metricReader.nextEntry(currentDocId); + Object metricValue = metricReader.value(currentDocId); + + for (MetricStat metricStat : metric.getBaseMetrics()) { + metrics[metricIndex] = metricValue; + metricIndex++; + } } catch (IOException e) { logger.error("unable to iterate to next doc", e); throw new RuntimeException("unable to iterate to next doc", e); @@ -584,9 +590,8 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId, List sortAndAggregateSegmentDocuments( } try { for (int i = 0; i < totalSegmentDocs; i++) { - StarTreeDocument document = getSegmentStarTreeDocument(i, dimensionReaders, metricReaders); + StarTreeDocument document = getSegmentStarTreeDocumentWithMetricFieldValues(i, dimensionReaders, metricReaders); segmentDocumentFileManager.writeStarTreeDocument(document, false); } } catch (IOException ex) { @@ -128,6 +128,45 @@ public Iterator sortAndAggregateSegmentDocuments( return sortAndReduceDocuments(sortedDocIds, totalSegmentDocs, false); } + /** + * Returns the star-tree document from the segment based on the current doc id + */ + StarTreeDocument getSegmentStarTreeDocumentWithMetricFieldValues( + int currentDocId, + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException { + Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId, dimensionReaders); + Object[] metricValues = getStarTreeMetricFieldValuesFromSegment(currentDocId, metricReaders); + return new StarTreeDocument(dimensions, metricValues); + } + + /** + * Returns the metric field values for the star-tree document from the segment based on the current doc id + */ + private Object[] getStarTreeMetricFieldValuesFromSegment(int currentDocId, List metricReaders) { + Object[] metricValues = new Object[starTreeField.getMetrics().size()]; + for (int i = 0; i < starTreeField.getMetrics().size(); i++) { + if (starTreeField.getMetrics().get(i).getBaseMetrics().isEmpty()) continue; + SequentialDocValuesIterator metricReader = metricReaders.get(i); + if (metricReader != null) { + try { + metricReader.nextEntry(currentDocId); + metricValues[i] = metricReader.value(currentDocId); + } catch (IOException e) { + logger.error("unable to iterate to next doc", e); + throw new RuntimeException("unable to iterate to next doc", e); + } catch (Exception e) { + logger.error("unable to read the metric values from the segment", e); + throw new IllegalStateException("unable to read the metric values from the segment", e); + } + } else { + throw new IllegalStateException("metric reader is empty"); + } + } + return metricValues; + } + /** * Sorts and aggregates the star-tree documents from multiple segments and builds star tree based on the newly * aggregated star-tree documents diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtil.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtil.java index a508e497adcdf..4e7ec30f23c3b 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtil.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtil.java @@ -8,34 +8,17 @@ package org.opensearch.index.compositeindex.datacube.startree.utils; -import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RandomAccessInput; import org.opensearch.common.util.ByteArrayBackedBitset; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.function.Function; /** * Helper class to read/write bitset for null values and identity values. */ public class StarTreeDocumentBitSetUtil { - /** - * Write bitset for null values. - * - * @param array array of objects - * @param output output stream - * @return number of bytes written - * @throws IOException if an I/O error occurs while writing to the output stream - */ - public static int writeBitSet(Object[] array, IndexOutput output) throws IOException { - ByteArrayBackedBitset bitset = new ByteArrayBackedBitset(getLength(array)); - for (int i = 0; i < array.length; i++) { - if (array[i] == null) { - bitset.set(i); - } - } - return bitset.write(output); - } /** * Set identity values based on bitset. @@ -51,6 +34,19 @@ public static int readBitSet(RandomAccessInput input, long offset, Object[] arra return bitset.getCurrBytesRead(); } + /** + * Write the bitset for the given array to the ByteBuffer + */ + public static void writeBitSet(Object[] array, ByteBuffer buffer) throws IOException { + ByteArrayBackedBitset bitset = new ByteArrayBackedBitset(getLength(array)); + for (int i = 0; i < array.length; i++) { + if (array[i] == null) { + bitset.set(i); + } + } + bitset.write(buffer); + } + private static int getLength(Object[] array) { return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1); } diff --git a/server/src/test/java/org/opensearch/common/util/ByteArrayBackedBitsetTests.java b/server/src/test/java/org/opensearch/common/util/ByteArrayBackedBitsetTests.java index 6750a9e48f033..f07c363372333 100644 --- a/server/src/test/java/org/opensearch/common/util/ByteArrayBackedBitsetTests.java +++ b/server/src/test/java/org/opensearch/common/util/ByteArrayBackedBitsetTests.java @@ -16,6 +16,8 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.file.Path; /** @@ -39,7 +41,10 @@ private static void testWriteAndReadBitset(int randomArraySize, int randomIndex1 IndexOutput indexOutput = fsDirectory.createOutput(TEST_FILE, IOContext.DEFAULT); bitset.set(randomIndex1); bitset.set(randomIndex2); - bitset.write(indexOutput); + byte[] bytes = new byte[randomArraySize]; + ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()); + bitset.write(buffer); + indexOutput.writeBytes(bytes, bytes.length); indexOutput.close(); IndexInput in = fsDirectory.openInput(TEST_FILE, IOContext.DEFAULT); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtilTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtilTests.java index 7d1bd37246fae..6c9923898b729 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtilTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDocumentBitSetUtilTests.java @@ -16,6 +16,8 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.file.Path; import java.util.Arrays; import java.util.function.Function; @@ -46,7 +48,11 @@ void testNullBasedOnBitset(Long[] dims) throws IOException { FSDirectory fsDirectory = FSDirectory.open(basePath); String TEST_FILE = "test_file"; IndexOutput indexOutput = fsDirectory.createOutput(TEST_FILE, IOContext.DEFAULT); - StarTreeDocumentBitSetUtil.writeBitSet(dims, indexOutput); + int numBytes = getLength(dims); + byte[] bytes = new byte[numBytes]; + ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()); + StarTreeDocumentBitSetUtil.writeBitSet(dims, buffer); + indexOutput.writeBytes(bytes, numBytes); indexOutput.close(); // test null value on read @@ -69,4 +75,8 @@ void testNullBasedOnBitset(Long[] dims) throws IOException { assertEquals(randomLong, (long) dims1[randomNullIndex2]); in.close(); } + + private static int getLength(Object[] array) { + return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1); + } }