Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] reader buffer optimization #1816

Merged
merged 6 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: |
mvn --batch-mode -DclickhouseVersion=$PREFERRED_LTS_VERSION \
mvn --batch-mode -DclickhouseVersion=$PREFERRED_LTS_VERSION -Dclient.tests.useNewImplementation=true \
-Panalysis verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar
continue-on-error: true
54 changes: 44 additions & 10 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
import com.clickhouse.client.api.enums.Protocol;
Expand All @@ -20,6 +21,7 @@
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.insert.POJOSerializer;
import com.clickhouse.client.api.insert.SerializerNotFoundException;
import com.clickhouse.client.api.internal.BasicObjectsPool;
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -753,6 +756,22 @@ public Builder setMaxRetries(int maxRetries) {
return this;
}

/**
* Configures client to reuse allocated byte buffers for numbers. It affects how binary format reader is working.
* If set to 'true' then {@link Client#newBinaryFormatReader(QueryResponse)} will construct reader that will
* reuse buffers for numbers. It improves performance for large datasets by reducing number of allocations
* (therefore GC pressure).
* Enabling this feature is safe because each reader suppose to be used by a single thread and readers are not reused.
*
* Default is false.
* @param reuse - if to reuse buffers
* @return
*/
public Builder allowBinaryReaderToReuseBuffers(boolean reuse) {
this.configuration.put("client_allow_binary_reader_to_reuse_buffers", String.valueOf(reuse));
return this;
}

public Client build() {
setDefaults();

Expand Down Expand Up @@ -866,6 +885,10 @@ private void setDefaults() {
if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) {
setMaxRetries(3);
}

if (!configuration.containsKey("client_allow_binary_reader_to_reuse_buffers")) {
allowBinaryReaderToReuseBuffers(false);
}
}
}

Expand Down Expand Up @@ -1442,10 +1465,10 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
settings.waitEndOfQuery(true); // we rely on the summery

final QuerySettings finalSettings = settings;
return query(sqlQuery, settings).thenApply(response -> {
try {
return new Records(response, finalSettings);

return new Records(response, newBinaryFormatReader(response));
} catch (Exception e) {
throw new ClientException("Failed to get query response", e);
}
Expand All @@ -1462,13 +1485,14 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
public List<GenericRecord> queryAll(String sqlQuery) {
try {
int operationTimeout = getOperationTimeout();
QuerySettings settings = new QuerySettings().waitEndOfQuery(true);
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.waitEndOfQuery(true);
try (QueryResponse response = operationTimeout == 0 ? query(sqlQuery, settings).get() :
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
List<GenericRecord> records = new ArrayList<>();
if (response.getResultRows() > 0) {
RowBinaryWithNamesAndTypesFormatReader reader =
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
(RowBinaryWithNamesAndTypesFormatReader) newBinaryFormatReader(response);

Map<String, Object> record;
while (reader.readRecord((record = new LinkedHashMap<>()))) {
Expand Down Expand Up @@ -1569,28 +1593,38 @@ public CompletableFuture<CommandResponse> execute(String sql) {
* @param schema
* @return
*/
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
ClickHouseBinaryFormatReader reader = null;
// Using caching buffer allocator is risky so this parameter is not exposed to the user
boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers");
BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
new BinaryStreamReader.CachingByteBufferAllocator() :
new BinaryStreamReader.DefaultByteBufferAllocator();

switch (response.getFormat()) {
case Native:
reader = new NativeFormatReader(response.getInputStream(), response.getSettings());
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool);
break;
case RowBinaryWithNamesAndTypes:
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool);
break;
case RowBinaryWithNames:
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema);
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool);
break;
case RowBinary:
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema);
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool);
break;
default:
throw new IllegalArgumentException("Unsupported format: " + response.getFormat());
}
return reader;
}

public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
return newBinaryFormatReader(response, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Map;
import java.util.UUID;

public interface ClickHouseBinaryFormatReader {
public interface ClickHouseBinaryFormatReader extends AutoCloseable {

/**
* Reads a single value from the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ public class NativeFormatReader extends AbstractBinaryFormatReader {

private int blockRowIndex;

public NativeFormatReader(InputStream inputStream) {
this(inputStream, null);
}

public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
super(inputStream, settings, null);
public NativeFormatReader(InputStream inputStream, QuerySettings settings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, settings, null, byteBufferAllocator);
readNextRecord();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.client.api.data_formats;

import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;
Expand All @@ -12,12 +13,9 @@

public class RowBinaryFormatReader extends AbstractBinaryFormatReader {

public RowBinaryFormatReader(InputStream inputStream, TableSchema schema) {
this(inputStream, null, schema);
}

public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
super(inputStream, querySettings, schema);
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, schema, byteBufferAllocator);
readNextRecord();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -17,8 +16,9 @@

public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {

public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings) {
super(inputStream, querySettings, null);
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, null, byteBufferAllocator);
readSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {

private List<String> columns = null;

public RowBinaryWithNamesFormatReader(InputStream inputStream, TableSchema schema) {
this(inputStream, null, schema);
readNextRecord();
}

public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
super(inputStream, querySettings, schema);
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, schema, byteBufferAllocator);
int nCol = 0;
try {
nCol = BinaryStreamReader.readVarInt(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.internal.BasicObjectsPool;
import com.clickhouse.client.api.internal.MapUtils;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.NullValueException;
Expand Down Expand Up @@ -41,6 +42,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {

Expand All @@ -58,7 +60,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm

private volatile boolean hasNext = true;

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
this.input = inputStream;
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
boolean useServerTimeZone = (boolean) this.settings.get(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey());
Expand All @@ -67,7 +70,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
if (timeZone == null) {
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
}
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG);
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator);
setSchema(schema);
}

Expand Down Expand Up @@ -133,12 +136,12 @@ protected void readNextRecord() {
try {
nextRecordEmpty.set(true);
if (!readRecord(nextRecord)) {
hasNext = false;
endReached();
} else {
nextRecordEmpty.compareAndSet(true, false);
}
} catch (IOException e) {
hasNext = false;
endReached();
throw new ClientException("Failed to read next row", e);
}
}
Expand All @@ -165,7 +168,7 @@ public Map<String, Object> next() {
return null;
}
} catch (IOException e) {
hasNext = false;
endReached();
throw new ClientException("Failed to read row", e);
}
}
Expand Down Expand Up @@ -621,4 +624,9 @@ public LocalDateTime getLocalDateTime(int index) {
}
return (LocalDateTime) value;
}

@Override
public void close() throws Exception {
input.close();
}
}
Loading