Skip to content

Commit

Permalink
[FEATURE]: Add Support ProgressListener in ClickHouseStatement (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrybugakov authored Sep 26, 2023
1 parent 2c97af7 commit 4dda6e8
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@

package com.github.housepower.jdbc.statement;

import com.github.housepower.jdbc.ClickHouseConnection;
import com.github.housepower.jdbc.ClickHouseResultSet;
import com.github.housepower.client.NativeContext;
import com.github.housepower.data.Block;
import com.github.housepower.jdbc.ClickHouseConnection;
import com.github.housepower.jdbc.ClickHouseResultSet;
import com.github.housepower.jdbc.wrapper.SQLStatement;
import com.github.housepower.log.Logger;
import com.github.housepower.log.LoggerFactory;
import com.github.housepower.misc.ExceptionUtil;
import com.github.housepower.misc.Validate;
import com.github.housepower.stream.QueryResult;
import com.github.housepower.protocol.listener.ProgressListener;
import com.github.housepower.settings.ClickHouseConfig;
import com.github.housepower.settings.SettingKey;
import com.github.housepower.jdbc.wrapper.SQLStatement;
import com.github.housepower.stream.ClickHouseQueryResult;
import com.github.housepower.stream.QueryResult;
import com.github.housepower.stream.ValuesNativeInputFormat;

import java.sql.Connection;
Expand All @@ -48,6 +50,7 @@ public class ClickHouseStatement implements SQLStatement {
protected Block block;
protected final ClickHouseConnection connection;
protected final NativeContext nativeContext;
private ProgressListener progressListener;

private ClickHouseConfig cfg;
private long maxRows;
Expand Down Expand Up @@ -91,6 +94,11 @@ public int executeUpdate(String query) throws SQLException {
}
updateCount = -1;
QueryResult result = connection.sendQueryRequest(query, cfg);

if (result instanceof ClickHouseQueryResult) {
((ClickHouseQueryResult) result).setProgressListener(this.progressListener);
}

lastResultSet = new ClickHouseResultSet(this, cfg, db, table, result.header(), result.data());
return 0;
});
Expand Down Expand Up @@ -162,6 +170,10 @@ public void setQueryTimeout(int seconds) {
this.cfg = cfg.withQueryTimeout(Duration.ofSeconds(seconds));
}

public void setProgressListener(ProgressListener listener) {
this.progressListener = listener;
}

@Override
public void setFetchDirection(int direction) throws SQLException {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
public class ProgressResponse implements Response {

public static ProgressResponse readFrom(BinaryDeserializer deserializer) throws IOException {
return new ProgressResponse(deserializer.readVarInt(), deserializer.readVarInt(), deserializer.readVarInt());
return new ProgressResponse(
deserializer.readVarInt(),
deserializer.readVarInt(),
deserializer.readVarInt()
);
}

private final long newRows;
Expand Down Expand Up @@ -50,4 +54,13 @@ public long newBytes() {
public long newTotalRows() {
return newTotalRows;
}

@Override
public String toString() {
return "ProgressResponse {" +
"newRows=" + newRows +
", newBytes=" + newBytes +
", newTotalRows=" + newTotalRows +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.housepower.protocol.listener;

import com.github.housepower.log.Logger;
import com.github.housepower.log.LoggerFactory;
import com.github.housepower.protocol.ProgressResponse;

public class DefaultProgressListener implements ProgressListener {

private static final Logger LOG = LoggerFactory.getLogger(DefaultProgressListener.class);

private DefaultProgressListener() {
}

public static DefaultProgressListener create() {
return new DefaultProgressListener();
}

@Override
public void onProgress(ProgressResponse progressResponse) {
LOG.info("DefaultProgressListener: ".concat(progressResponse.toString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.housepower.protocol.listener;

import com.github.housepower.protocol.ProgressResponse;

public interface ProgressListener {
void onProgress(ProgressResponse progressResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import com.github.housepower.protocol.EOFStreamResponse;
import com.github.housepower.protocol.ProgressResponse;
import com.github.housepower.protocol.Response;
import com.github.housepower.protocol.listener.ProgressListener;

import java.sql.SQLException;

public class ClickHouseQueryResult implements QueryResult {

public class ClickHouseQueryResult implements QueryResult {
private final CheckedSupplier<Response, SQLException> responseSupplier;
private ProgressListener progressListener;
private Block header;
private boolean atEnd;
// Progress
// Totals
// Extremes
// ProfileInfo
Expand All @@ -39,6 +40,15 @@ public ClickHouseQueryResult(CheckedSupplier<Response, SQLException> responseSup
this.responseSupplier = responseSupplier;
}

public ClickHouseQueryResult(CheckedSupplier<Response, SQLException> responseSupplier, ProgressListener progressListener) {
this.progressListener = progressListener;
this.responseSupplier = responseSupplier;
}

public void setProgressListener(ProgressListener progressListener) {
this.progressListener = progressListener;
}

@Override
public Block header() throws SQLException {
ensureHeaderConsumed();
Expand Down Expand Up @@ -97,6 +107,9 @@ private DataResponse consumeDataResponse() throws SQLException {
} else if (response instanceof EOFStreamResponse || response == null) {
atEnd = true;
} else if (response instanceof ProgressResponse) {
if (progressListener != null) {
progressListener.onProgress((ProgressResponse) response);
}
readRows += ((ProgressResponse) response).newRows();
readBytes += ((ProgressResponse) response).newBytes();
}
Expand Down

0 comments on commit 4dda6e8

Please sign in to comment.