Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(jdbc-sink): fix deliver DELETE event for row with uuid as primary key #16447

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ query IT
select * from t1_uuid;
----
221 74605c5a-a7bb-4b3b-8742-2a12e9709dea hello world


query T
select * from sk_t1_uuid
----
21189447-8736-44bd-b254-26b5dec91da9
20 changes: 20 additions & 0 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,26 @@ CREATE SINK s1_uuid FROM t1_uuid WITH (
statement ok
INSERT INTO t1_uuid values (221, '74605c5a-a7bb-4b3b-8742-2a12e9709dea', 'hello world');


statement ok
CREATE TABLE t1_test_uuid_delete (id varchar, primary key(id));

statement ok
INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8'), ('21189447-8736-44bd-b254-26b5dec91da9');

statement ok
CREATE SINK sk_t1_uuid FROM t1_test_uuid_delete WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='sk_t1_uuid',
primary_key='id',
type='upsert'
);

statement ok
DELETE FROM t1_test_uuid_delete WHERE ID='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8';


statement ok
INSERT INTO tt2 VALUES
(1),
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,5 @@ CREATE TABLE biz.t_types (
CREATE TABLE biz.t2 (
"aBc" INTEGER PRIMARY KEY
);

CREATE TABLE sk_t1_uuid (id uuid, primary key(id));
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.Status;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,20 +59,27 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
// column name -> java.sql.Types
Map<String, Integer> columnTypeMapping =
getColumnTypeMapping(conn, config.getTableName(), config.getSchemaName());
// create an array that each slot corresponding to each column in TableSchema
var columnSqlTypes = new int[tableSchema.getNumColumns()];
for (int columnIdx = 0; columnIdx < tableSchema.getNumColumns(); columnIdx++) {
var columnName = tableSchema.getColumnNames()[columnIdx];
columnSqlTypes[columnIdx] = columnTypeMapping.get(columnName);
}

// A vector of upstream column types
List<Integer> columnSqlTypes =
Arrays.stream(tableSchema.getColumnNames())
.map(columnTypeMapping::get)
.collect(Collectors.toList());

List<Integer> pkIndices =
tableSchema.getPrimaryKeys().stream()
.map(tableSchema::getColumnIndex)
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}: columnSqlTypes = {}",
"schema = {}, table = {}, columnSqlTypes = {}, pkIndices = {}",
config.getSchemaName(),
config.getTableName(),
Arrays.toString(columnSqlTypes));
columnSqlTypes,
pkIndices);

if (factory.isPresent()) {
this.jdbcDialect = factory.get().create(columnSqlTypes);
this.jdbcDialect = factory.get().create(columnSqlTypes, pkIndices);
} else {
throw Status.INVALID_ARGUMENT
.withDescription("Unsupported jdbc url: " + jdbcUrl)
Expand Down Expand Up @@ -303,11 +311,7 @@ public void prepareDelete(SinkRow row) {
.asRuntimeException();
}
try {
int placeholderIdx = 1;
for (String primaryKey : pkColumnNames) {
Object fromRow = tableSchema.getFromRow(primaryKey, row);
deleteStatement.setObject(placeholderIdx++, fromRow);
}
jdbcDialect.bindDeleteStatement(deleteStatement, row);
deleteStatement.addBatch();
} catch (SQLException e) {
throw Status.INTERNAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@ void bindUpsertStatement(
void bindInsertIntoStatement(
PreparedStatement stmt, Connection conn, TableSchema tableSchema, SinkRow row)
throws SQLException;

/** Bind the values of primary key fields to the {@code DELETE} statement. */
void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package com.risingwave.connector.jdbc;

import java.util.List;

public interface JdbcDialectFactory {

JdbcDialect create(int[] columnSqlTypes);
JdbcDialect create(List<Integer> columnSqlTypes, List<Integer> pkIndices);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@

public class MySqlDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public MySqlDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}

@Override
public SchemaTableName createSchemaTableName(String schemaName, String tableName) {
return new SchemaTableName(schemaName, tableName);
Expand Down Expand Up @@ -100,4 +116,14 @@ public void bindInsertIntoStatement(
}
}
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package com.risingwave.connector.jdbc;

import java.util.List;

public class MySqlDialectFactory implements JdbcDialectFactory {

@Override
public JdbcDialect create(int[] columnSqlTypes) {
return new MySqlDialect();
public JdbcDialect create(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
return new MySqlDialect(columnSqlTypes, pkIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@
public class PostgresDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public PostgresDialect(int[] columnSqlTypes) {
this.columnSqlTypes = columnSqlTypes;
public PostgresDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}

private static final HashMap<TypeName, String> RW_TYPE_TO_JDBC_TYPE_NAME;
Expand Down Expand Up @@ -154,4 +164,14 @@ public void bindInsertIntoStatement(
}
}
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package com.risingwave.connector.jdbc;

import java.util.List;

public class PostgresDialectFactory implements JdbcDialectFactory {

@Override
public JdbcDialect create(int[] columnSqlTypes) {
return new PostgresDialect(columnSqlTypes);
public JdbcDialect create(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
return new PostgresDialect(columnSqlTypes, pkIndices);
}
}
Loading