Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Apr 16, 2024
1 parent 876ef34 commit a006ada
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 11 deletions.
23 changes: 23 additions & 0 deletions e2e_test/source/cdc/cdc.check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,26 @@ query I
select count(*) from tt3_rw;
----
2

query III rowsort
select * from numeric_to_rw_int256
----

query III rowsort
select * from numeric_to_varchar
----
1 3.14
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 57896044618658097711785492504343953926634992332820282019728792003956564819968
4 115792089237316195423570985008687907853269984665640564039457584007913129639936
5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555

# The invalid data for rw_int256 is converted to NULL
query III rowsort
select * from numeric_to_rw_int256
----
1 NULL
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 NULL
4 NULL
5 NULL
29 changes: 28 additions & 1 deletion e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,31 @@ SELECT * FROM rw.products_test order by id limit 3
query TTTT
select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id;
----
2022-12-01 15:08:22 Sam 110 0
2022-12-01 15:08:22 Sam 110 0

query III rowsort
select * from numeric_to_varchar
----
1 3.14
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 57896044618658097711785492504343953926634992332820282019728792003956564819968
104 115792089237316195423570985008687907853269984665640564039457584007913129639936
105 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 57896044618658097711785492504343953926634992332820282019728792003956564819968
4 115792089237316195423570985008687907853269984665640564039457584007913129639936
5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555

# The invalid data for rw_int256 is converted to NULL
query III rowsort
select * from numeric_to_rw_int256
----
1 NULL
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 NULL
104 NULL
105 NULL
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 NULL
4 NULL
5 NULL
33 changes: 33 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,36 @@ create table person_rw (
publication.name='dumb_publicaton',
publication.create.enable='false'
);

statement ok
create table numeric_to_rw_int256 (
id int,
num rw_int256,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name = 'numeric_table',
slot.name = 'numeric_to_rw_int256'
);

statement ok
create table numeric_to_varchar (
id int,
num varchar,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'numeric_table',
slot.name = 'numeric_to_varchar'
);
59 changes: 59 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,62 @@ SELECT * from person_new order by id;
1100 noris [email protected] 1864 2539 enne
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles

statement ok
CREATE TABLE numeric_to_rw_int256_shared (
id int,
num rw_int256,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_table';

statement ok
CREATE TABLE numeric_to_varchar_shared (
id int,
num varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_table';


system ok
psql -c "
insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
--- 2^256
insert into numeric_table values(104, 115792089237316195423570985008687907853269984665640564039457584007913129639936);
insert into numeric_table values(105, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555);
"

sleep 3s

query III rowsort
select * from numeric_to_varchar_shared
----
1 3.14
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 57896044618658097711785492504343953926634992332820282019728792003956564819968
104 115792089237316195423570985008687907853269984665640564039457584007913129639936
105 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 57896044618658097711785492504343953926634992332820282019728792003956564819968
4 115792089237316195423570985008687907853269984665640564039457584007913129639936
5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555

# The invalid data for rw_int256 is converted to NULL
query III rowsort
select * from numeric_to_rw_int256_shared
----
1 NULL
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 NULL
104 NULL
105 NULL
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 NULL
4 NULL
5 NULL

system ok
psql -c "
DELETE FROM numeric_table WHERE id IN (102, 103, 104, 105);
"
34 changes: 34 additions & 0 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,37 @@ create table shipments (
table.name = 'shipments',
slot.name = 'shipments'
) format canal encode csv;

statement ok
explain create table numeric_to_rw_int256 (
id int,
num rw_int256,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'numeric_table',
slot.name = 'numeric_to_rw_int256'
);

statement ok
explain create table numeric_to_varchar (
id int,
num varchar,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'numeric_table',
slot.name = 'numeric_to_varchar'
);
10 changes: 10 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,13 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e');

create table numeric_table(id int PRIMARY KEY, num numeric);
insert into numeric_table values(1, 3.14);
--- 2^255 - 1
insert into numeric_table values(2, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(3, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
--- 2^256
insert into numeric_table values(4, 115792089237316195423570985008687907853269984665640564039457584007913129639936);
insert into numeric_table values(5, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555);
7 changes: 7 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ insert into abs.t1 values (2, 2.2, 'bbb', '1234.5431');
SELECT pg_current_wal_lsn();
select * from pg_publication_tables where pubname='rw_publication';
select * from public.person order by id;

insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
--- 2^256
insert into numeric_table values(104, 115792089237316195423570985008687907853269984665640564039457584007913129639936);
insert into numeric_table values(105, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555);
20 changes: 10 additions & 10 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
DataType::Int256 => postgres_decimal_to_rw_int256(&row, i, name, None),
DataType::Varchar => {
match row.columns()[i].type_() {
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
&Type::UUID => {
Type::UUID => {
let res = row.try_get::<_, Option<uuid::Uuid>>(i);
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Expand All @@ -164,7 +164,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
// support converting NUMERIC to VARCHAR implicitly
&Type::NUMERIC => postgres_decimal_to_varchar(&row, i, name, None),
Type::NUMERIC => postgres_decimal_to_varchar(&row, i, name, None),
_ => {
handle_data_type!(row, i, name, String)
}
Expand Down Expand Up @@ -233,9 +233,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
handle_list_data_type!(row, i, name, NaiveDate, builder, Date);
}
DataType::Varchar => {
match row.columns()[i].type_() {
match *row.columns()[i].type_() {
// Since we don't support UUID natively, adapt it to a VARCHAR column
&Type::UUID => {
Type::UUID => {
let res = row.try_get::<_, Option<uuid::Uuid>>(i);
match res {
Ok(val) => {
Expand All @@ -257,7 +257,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
};
}
// support converting NUMERIC to VARCHAR implicitly
&Type::NUMERIC => {
Type::NUMERIC => {
let _ = postgres_decimal_to_varchar(&row, i, name, None);
}
_ => {
Expand Down Expand Up @@ -390,8 +390,8 @@ fn postgres_decimal_to_rw_int256(
let string = postgres_decimal_to_string(row, idx, name)?;
match Int256::from_str(string.as_str()) {
Ok(num) => {
if builder.is_some() {
builder.unwrap().append(Some(ScalarImpl::from(num.clone())));
if let Some(builder) = builder {
builder.append(Some(ScalarImpl::from(num.clone())));
None
} else {
Some(ScalarImpl::from(num))
Expand Down Expand Up @@ -421,8 +421,8 @@ fn postgres_decimal_to_varchar(
// we use the PgNumeric type to convert the decimal to a string.
let string = postgres_decimal_to_string(row, idx, name)?;

if builder.is_some() {
builder.unwrap().append(Some(ScalarImpl::from(string)));
if let Some(builder) = builder {
builder.append(Some(ScalarImpl::from(string)));
None
} else {
Some(ScalarImpl::from(string))
Expand Down

0 comments on commit a006ada

Please sign in to comment.