Skip to content

Commit

Permalink
[core] Introduce metadata.stats-dense-store to reduce meta size for m…
Browse files Browse the repository at this point in the history
…ultiple columns table (#4322)
  • Loading branch information
JingsongLi authored Oct 15, 2024
1 parent 5e5b2fd commit 11636ac
Show file tree
Hide file tree
Showing 61 changed files with 1,286 additions and 472 deletions.
3 changes: 3 additions & 0 deletions docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ Paimon will automatically collect the statistics of the data file for speeding u
The statistics collector mode can be configured by `'metadata.stats-mode'`, by default is `'truncate(16)'`.
You can configure the field level by setting `'fields.{field_name}.stats-mode'`.

For the stats mode of `none`, we suggest that you configure `metadata.stats-dense-store` = `true`, which will
significantly reduce the storage size of the manifest.

### Field Default Value

Paimon table currently supports setting default values for fields in table properties by `'fields.item_id.default-value'`,
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br />Possible values:<ul><li>"deduplicate": De-duplicate and keep the last row.</li><li>"partial-update": Partial update non-null fields.</li><li>"aggregation": Aggregate fields with same primary key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
</tr>
<tr>
<td><h5>metadata.stats-dense-store</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to store statistic densely in metadata (manifest files), which will significantly reduce the storage size of metadata when the none statistic mode is set.<br />Note, when this mode is enabled, the Paimon sdk in reading engine requires at least version 0.9.1 or 1.0.0 or higher.</td>
</tr>
<tr>
<td><h5>metadata.stats-mode</h5></td>
<td style="word-wrap: break-word;">"truncate(16)"</td>
Expand Down
22 changes: 21 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ public class CoreOptions implements Serializable {
public static final String STATS_MODE_SUFFIX = "stats-mode";

public static final ConfigOption<String> METADATA_STATS_MODE =
key("metadata." + STATS_MODE_SUFFIX)
key("metadata.stats-mode")
.stringType()
.defaultValue("truncate(16)")
.withDescription(
Expand All @@ -1053,6 +1053,22 @@ public class CoreOptions implements Serializable {
+ STATS_MODE_SUFFIX))
.build());

public static final ConfigOption<Boolean> METADATA_STATS_DENSE_STORE =
key("metadata.stats-dense-store")
.booleanType()
.defaultValue(false)
.withDescription(
Description.builder()
.text(
"Whether to store statistic densely in metadata (manifest files), which"
+ " will significantly reduce the storage size of metadata when the"
+ " none statistic mode is set.")
.linebreak()
.text(
"Note, when this mode is enabled, the Paimon sdk in reading engine requires"
+ " at least version 0.9.1 or 1.0.0 or higher.")
.build());

public static final ConfigOption<String> COMMIT_CALLBACKS =
key("commit.callbacks")
.stringType()
Expand Down Expand Up @@ -2233,6 +2249,10 @@ public boolean asyncFileWrite() {
return options.get(ASYNC_FILE_WRITE);
}

public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
*/
public class SimpleColStats {

public static final SimpleColStats NONE = new SimpleColStats(null, null, null);

@Nullable private final Object min;
@Nullable private final Object max;
private final Long nullCount;
Expand All @@ -58,6 +60,10 @@ public Long nullCount() {
return nullCount;
}

public boolean isNone() {
return min == null && max == null && nullCount == null;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof SimpleColStats)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public void collect(Object field, Serializer<Object> fieldSerializer) {}

@Override
public SimpleColStats result() {
return new SimpleColStats(null, null, null);
return SimpleColStats.NONE;
}

@Override
public SimpleColStats convert(SimpleColStats source) {
return new SimpleColStats(null, null, null);
return SimpleColStats.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public class SystemFields {
public static boolean isSystemField(int fieldId) {
return fieldId >= SYSTEM_FIELD_ID_START;
}

public static boolean isSystemField(String field) {
return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
Expand Down Expand Up @@ -198,7 +200,11 @@ public static Object get(DataGetters dataGetters, int pos, DataType fieldType) {
}
}

public static InternalArray toStringArrayData(List<String> list) {
public static InternalArray toStringArrayData(@Nullable List<String> list) {
if (list == null) {
return null;
}

return new GenericArray(list.stream().map(BinaryString::fromString).toArray());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;

/**
* An implementation of {@link InternalArray} which provides a projected view of the underlying
* {@link InternalArray}.
*
* <p>Projection includes both reducing the accessible fields and reordering them.
*
* <p>Note: This class supports only top-level projections, not nested projections.
*/
public class ProjectedArray implements InternalArray {

private final int[] indexMapping;

private InternalArray array;

private ProjectedArray(int[] indexMapping) {
this.indexMapping = indexMapping;
}

/**
* Replaces the underlying {@link InternalArray} backing this {@link ProjectedArray}.
*
* <p>This method replaces the row data in place and does not return a new object. This is done
* for performance reasons.
*/
public ProjectedArray replaceArray(InternalArray array) {
this.array = array;
return this;
}

// ---------------------------------------------------------------------------------------------

@Override
public int size() {
return indexMapping.length;
}

@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
return true;
}
return array.isNullAt(indexMapping[pos]);
}

@Override
public boolean getBoolean(int pos) {
return array.getBoolean(indexMapping[pos]);
}

@Override
public byte getByte(int pos) {
return array.getByte(indexMapping[pos]);
}

@Override
public short getShort(int pos) {
return array.getShort(indexMapping[pos]);
}

@Override
public int getInt(int pos) {
return array.getInt(indexMapping[pos]);
}

@Override
public long getLong(int pos) {
return array.getLong(indexMapping[pos]);
}

@Override
public float getFloat(int pos) {
return array.getFloat(indexMapping[pos]);
}

@Override
public double getDouble(int pos) {
return array.getDouble(indexMapping[pos]);
}

@Override
public BinaryString getString(int pos) {
return array.getString(indexMapping[pos]);
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
return array.getDecimal(indexMapping[pos], precision, scale);
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
return array.getTimestamp(indexMapping[pos], precision);
}

@Override
public byte[] getBinary(int pos) {
return array.getBinary(indexMapping[pos]);
}

@Override
public InternalArray getArray(int pos) {
return array.getArray(indexMapping[pos]);
}

@Override
public InternalMap getMap(int pos) {
return array.getMap(indexMapping[pos]);
}

@Override
public InternalRow getRow(int pos, int numFields) {
return array.getRow(indexMapping[pos], numFields);
}

@Override
public boolean equals(Object o) {
throw new UnsupportedOperationException("Projected row data cannot be compared");
}

@Override
public int hashCode() {
throw new UnsupportedOperationException("Projected row data cannot be hashed");
}

@Override
public String toString() {
throw new UnsupportedOperationException("Projected row data cannot be toString");
}

/**
* Create an empty {@link ProjectedArray} starting from a {@code projection} array.
*
* <p>The array represents the mapping of the fields of the original {@link DataType}. For
* example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd
* field and the 2nd field of the row.
*
* @see Projection
* @see ProjectedArray
*/
public static ProjectedArray from(int[] projection) {
return new ProjectedArray(projection);
}

@Override
public boolean[] toBooleanArray() {
throw new UnsupportedOperationException();
}

@Override
public byte[] toByteArray() {
throw new UnsupportedOperationException();
}

@Override
public short[] toShortArray() {
throw new UnsupportedOperationException();
}

@Override
public int[] toIntArray() {
throw new UnsupportedOperationException();
}

@Override
public long[] toLongArray() {
throw new UnsupportedOperationException();
}

@Override
public float[] toFloatArray() {
throw new UnsupportedOperationException();
}

@Override
public double[] toDoubleArray() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
private final IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> bucketFileRead;
private final boolean forceCompact;
private final boolean asyncFileWrite;
private final boolean statsDenseStore;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
Expand Down Expand Up @@ -111,7 +112,8 @@ public AppendOnlyWriter(
SimpleColStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize,
FileIndexOptions fileIndexOptions,
boolean asyncFileWrite) {
boolean asyncFileWrite,
boolean statsDenseStore) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand All @@ -122,6 +124,7 @@ public AppendOnlyWriter(
this.bucketFileRead = bucketFileRead;
this.forceCompact = forceCompact;
this.asyncFileWrite = asyncFileWrite;
this.statsDenseStore = statsDenseStore;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
Expand Down Expand Up @@ -286,7 +289,8 @@ private RowDataRollingFileWriter createRollingRowWriter() {
statsCollectors,
fileIndexOptions,
FileSource.APPEND,
asyncFileWrite);
asyncFileWrite,
statsDenseStore);
}

private void trySyncLatestCompaction(boolean blocking)
Expand Down
Loading

0 comments on commit 11636ac

Please sign in to comment.