diff --git a/e2e_test/source/cdc/cdc.check.slt b/e2e_test/source/cdc/cdc.check.slt index b90137948be1e..e94ea36f2735d 100644 --- a/e2e_test/source/cdc/cdc.check.slt +++ b/e2e_test/source/cdc/cdc.check.slt @@ -46,3 +46,26 @@ query I select count(*) from tt3_rw; ---- 2 + +query III rowsort +select * from numeric_to_rw_int256 +---- + +query III rowsort +select * from numeric_to_varchar +---- +1 3.14 +2 57896044618658097711785492504343953926634992332820282019728792003956564819967 +3 57896044618658097711785492504343953926634992332820282019728792003956564819968 +4 115792089237316195423570985008687907853269984665640564039457584007913129639936 +5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555 + +# The invalid data for rw_int256 is converted to NULL +query III rowsort +select * from numeric_to_rw_int256 +---- +1 NULL +2 57896044618658097711785492504343953926634992332820282019728792003956564819967 +3 NULL +4 NULL +5 NULL diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 5a97a21ba973a..09a58b099246d 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -77,4 +77,31 @@ SELECT * FROM rw.products_test order by id limit 3 query TTTT select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id; ---- -2022-12-01 15:08:22 Sam 110 0 \ No newline at end of file +2022-12-01 15:08:22 Sam 110 0 + +query III rowsort +select * from numeric_to_varchar +---- +1 3.14 +102 57896044618658097711785492504343953926634992332820282019728792003956564819967 +103 57896044618658097711785492504343953926634992332820282019728792003956564819968 +104 115792089237316195423570985008687907853269984665640564039457584007913129639936 +105 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555 +2 57896044618658097711785492504343953926634992332820282019728792003956564819967 +3 57896044618658097711785492504343953926634992332820282019728792003956564819968 +4 115792089237316195423570985008687907853269984665640564039457584007913129639936 +5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555 + +# The invalid data for rw_int256 is converted to NULL +query III rowsort +select * from numeric_to_rw_int256 +---- +1 NULL +102 57896044618658097711785492504343953926634992332820282019728792003956564819967 +103 NULL +104 NULL +105 NULL +2 57896044618658097711785492504343953926634992332820282019728792003956564819967 +3 NULL +4 NULL +5 NULL diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index 7dda34fe08518..1cff579bb054d 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -193,3 +193,36 @@ create table person_rw ( publication.name='dumb_publicaton', publication.create.enable='false' ); + +statement ok +create table numeric_to_rw_int256 ( + id int, + num rw_int256, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + table.name = 'numeric_table', + slot.name = 'numeric_to_rw_int256' +); + +statement ok +create table numeric_to_varchar ( + id int, + num varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'numeric_table', + slot.name = 'numeric_to_varchar' +); diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index dc59de5f48005..0b3db45b4a933 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -268,3 +268,62 @@ SELECT * from person_new order by id; 1100 noris ypl@qbxfg.com 1864 2539 enne 1101 white myc@xpmpe.com 8157 6974 se 1102 spencer wip@dkaap.com 9481 6270 angeles + +statement ok +CREATE TABLE numeric_to_rw_int256_shared ( + id int, + num rw_int256, + PRIMARY KEY (id) +) FROM pg_source TABLE 'public.numeric_table'; + +statement ok +CREATE TABLE numeric_to_varchar_shared ( + id int, + num varchar, + PRIMARY KEY (id) +) FROM pg_source TABLE 'public.numeric_table'; + + +system ok +psql -c " +insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967); +--- 2^255 +insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968); +--- 2^256 +insert into numeric_table values(104, 115792089237316195423570985008687907853269984665640564039457584007913129639936); +insert into numeric_table values(105, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555); +" + +sleep 3s + +query III rowsort +select * from numeric_to_varchar_shared +---- +1 3.14 +102 57896044618658097711785492504343953926634992332820282019728792003956564819967 +103 57896044618658097711785492504343953926634992332820282019728792003956564819968 +104 115792089237316195423570985008687907853269984665640564039457584007913129639936 +105 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555 +2 57896044618658097711785492504343953926634992332820282019728792003956564819967 +3 57896044618658097711785492504343953926634992332820282019728792003956564819968 +4 115792089237316195423570985008687907853269984665640564039457584007913129639936 +5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555 + +# The invalid data for rw_int256 is converted to NULL +query III rowsort +select * from numeric_to_rw_int256_shared +---- +1 NULL +102 57896044618658097711785492504343953926634992332820282019728792003956564819967 +103 NULL +104 NULL +105 NULL +2 57896044618658097711785492504343953926634992332820282019728792003956564819967 +3 NULL +4 NULL +5 NULL + +system ok +psql -c " +DELETE FROM numeric_table WHERE id IN (102, 103, 104, 105); +" diff --git a/e2e_test/source/cdc/cdc.validate.postgres.slt b/e2e_test/source/cdc/cdc.validate.postgres.slt index 4ac6669913c58..1cf42983de49a 100644 --- a/e2e_test/source/cdc/cdc.validate.postgres.slt +++ b/e2e_test/source/cdc/cdc.validate.postgres.slt @@ -167,3 +167,37 @@ create table shipments ( table.name = 'shipments', slot.name = 'shipments' ) format canal encode csv; + +statement ok +explain create table numeric_to_rw_int256 ( + id int, + num rw_int256, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = 'db', + port = '5432', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = 'cdc_test', + schema.name = 'public', + table.name = 'numeric_table', + slot.name = 'numeric_to_rw_int256' +); + +statement ok +explain create table numeric_to_varchar ( + id int, + num varchar, + PRIMARY KEY (id) +) with ( + connector = 'postgres-cdc', + hostname = 'db', + port = '5432', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = 'cdc_test', + schema.name = 'public', + table.name = 'numeric_table', + slot.name = 'numeric_to_varchar' +); diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index a4de0e447a0cc..9364bb7dcf4a2 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -72,3 +72,13 @@ CREATE TABLE IF NOT EXISTS postgres_all_types( ); INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null); INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e'); + +create table numeric_table(id int PRIMARY KEY, num numeric); +insert into numeric_table values(1, 3.14); +--- 2^255 - 1 +insert into numeric_table values(2, 57896044618658097711785492504343953926634992332820282019728792003956564819967); +--- 2^255 +insert into numeric_table values(3, 57896044618658097711785492504343953926634992332820282019728792003956564819968); +--- 2^256 +insert into numeric_table values(4, 115792089237316195423570985008687907853269984665640564039457584007913129639936); +insert into numeric_table values(5, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555); diff --git a/e2e_test/source/cdc/postgres_cdc_insert.sql b/e2e_test/source/cdc/postgres_cdc_insert.sql index 84ae9068c187a..6168b91d67a47 100644 --- a/e2e_test/source/cdc/postgres_cdc_insert.sql +++ b/e2e_test/source/cdc/postgres_cdc_insert.sql @@ -11,3 +11,10 @@ insert into abs.t1 values (2, 2.2, 'bbb', '1234.5431'); SELECT pg_current_wal_lsn(); select * from pg_publication_tables where pubname='rw_publication'; select * from public.person order by id; + +insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967); +--- 2^255 +insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968); +--- 2^256 +insert into numeric_table values(104, 115792089237316195423570985008687907853269984665640564039457584007913129639936); +insert into numeric_table values(105, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555); diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index 6d14f59bf8cbe..2ca6c25a2d9c2 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -144,9 +144,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } DataType::Int256 => postgres_decimal_to_rw_int256(&row, i, name, None), DataType::Varchar => { - match row.columns()[i].type_() { + match *row.columns()[i].type_() { // Since we don't support UUID natively, adapt it to a VARCHAR column - &Type::UUID => { + Type::UUID => { let res = row.try_get::<_, Option>(i); match res { Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), @@ -164,7 +164,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } // support converting NUMERIC to VARCHAR implicitly - &Type::NUMERIC => postgres_decimal_to_varchar(&row, i, name, None), + Type::NUMERIC => postgres_decimal_to_varchar(&row, i, name, None), _ => { handle_data_type!(row, i, name, String) } @@ -233,9 +233,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O handle_list_data_type!(row, i, name, NaiveDate, builder, Date); } DataType::Varchar => { - match row.columns()[i].type_() { + match *row.columns()[i].type_() { // Since we don't support UUID natively, adapt it to a VARCHAR column - &Type::UUID => { + Type::UUID => { let res = row.try_get::<_, Option>(i); match res { Ok(val) => { @@ -257,7 +257,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O }; } // support converting NUMERIC to VARCHAR implicitly - &Type::NUMERIC => { + Type::NUMERIC => { let _ = postgres_decimal_to_varchar(&row, i, name, None); } _ => { @@ -390,8 +390,8 @@ fn postgres_decimal_to_rw_int256( let string = postgres_decimal_to_string(row, idx, name)?; match Int256::from_str(string.as_str()) { Ok(num) => { - if builder.is_some() { - builder.unwrap().append(Some(ScalarImpl::from(num.clone()))); + if let Some(builder) = builder { + builder.append(Some(ScalarImpl::from(num.clone()))); None } else { Some(ScalarImpl::from(num)) @@ -421,8 +421,8 @@ fn postgres_decimal_to_varchar( // we use the PgNumeric type to convert the decimal to a string. let string = postgres_decimal_to_string(row, idx, name)?; - if builder.is_some() { - builder.unwrap().append(Some(ScalarImpl::from(string))); + if let Some(builder) = builder { + builder.append(Some(ScalarImpl::from(string))); None } else { Some(ScalarImpl::from(string))