Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): introduce parquet file source #17201

Merged
merged 40 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
43bb0da
save work
wcy-fdu Apr 25, 2024
841f487
todo: add extract_parquet_table_schema
wcy-fdu Apr 28, 2024
80410b6
save work
wcy-fdu Apr 28, 2024
bb32774
save work
wcy-fdu May 6, 2024
9371fd1
save work
wcy-fdu May 29, 2024
843f622
save work
wcy-fdu May 30, 2024
d36deb0
save work
wcy-fdu Jun 3, 2024
aecb5e5
finish framework, do not use parser
wcy-fdu Jun 6, 2024
5e5589f
handle error
wcy-fdu Jun 6, 2024
a6dcded
save work, can load data from parquet
wcy-fdu Jun 7, 2024
8f9325b
parquet file source can work, todo: add e2e test
wcy-fdu Jun 11, 2024
bff6ff6
delete py package
wcy-fdu Jun 11, 2024
bf7fdff
minor
wcy-fdu Jun 11, 2024
68a0171
remove parquet parser
wcy-fdu Jun 11, 2024
3612e4b
adding ci, save work
wcy-fdu Jun 12, 2024
21ad23a
add ci
wcy-fdu Jun 14, 2024
9a49a23
minor
wcy-fdu Jun 14, 2024
28769c3
fix ci
wcy-fdu Jun 17, 2024
ca9f722
minor
wcy-fdu Jun 17, 2024
dde7952
minor
wcy-fdu Jun 21, 2024
efdeaa3
resolve some comments
wcy-fdu Jun 27, 2024
38a0628
save wprl
wcy-fdu Jul 4, 2024
0726fd1
resolve comments, introduce ParquetParser
wcy-fdu Jul 5, 2024
d71ee2b
save work
wcy-fdu Jul 8, 2024
c4b2024
resolve conflict
wcy-fdu Jul 8, 2024
09b6b80
resolve typo
wcy-fdu Jul 8, 2024
0835398
enhance comments
wcy-fdu Jul 8, 2024
6b3a0c4
fix dylink
wcy-fdu Jul 8, 2024
1a87a96
fmt
wcy-fdu Jul 9, 2024
c57795c
use tokio_util to convert futures::AsyncRead and tokio::AsyncRead
wcy-fdu Jul 10, 2024
fa24c40
use split.offset
wcy-fdu Jul 10, 2024
8bd0d56
resolve comments
wcy-fdu Jul 10, 2024
4e8d84d
enhance ci
wcy-fdu Jul 10, 2024
6c21157
fix row offset
wcy-fdu Jul 11, 2024
09f7150
rebase main
wcy-fdu Jul 11, 2024
0eb2060
minor
wcy-fdu Jul 11, 2024
28dcc17
fix offset, one row use one offset
wcy-fdu Jul 11, 2024
a960216
minor
wcy-fdu Jul 12, 2024
80c9d87
minor
wcy-fdu Jul 12, 2024
f4a05b3
fmt
wcy-fdu Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ echo "--- starting risingwave cluster with connector node"
risedev ci-start ci-1cn-1fe

echo "--- Run test"
python3 -m pip install --break-system-packages minio psycopg2-binary opendal
python3 -m pip install --break-system-packages minio psycopg2-binary opendal pandas
if [[ -v format_type ]]; then
python3 e2e_test/s3/"$script" "$format_type"
else
Expand Down
22 changes: 22 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,28 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

- label: "S3_v2 source batch read on AWS (json parser)"
key: "s3-v2-source-batch-read-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json"
Expand Down
137 changes: 137 additions & 0 deletions e2e_test/s3/fs_parquet_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import sys
import random
import psycopg2
import json
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timezone
from time import sleep
from minio import Minio
from random import uniform

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
'test_int': pa.scalar(1, type=pa.int32()),
'test_real': pa.scalar(4.0, type=pa.float32()),
'test_double_precision': pa.scalar(5.0, type=pa.float64()),
'test_varchar': pa.scalar('7', type=pa.string()),
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def do_test(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 10
ITEM_NUM_PER_FILE = 2000
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
client = Minio(
config["S3_ENDPOINT"],
access_key=config["S3_ACCESS_KEY"],
secret_key=config["S3_SECRET_KEY"],
secure=True,
)
run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.parquet'
_s3 = lambda idx: f"{run_id}_data_{idx}.parquet"

# put s3 files
for idx, file_data in enumerate(data):
table = pa.Table.from_pandas(pd.DataFrame(file_data))
pq.write_table(table, _local(idx))

client.fput_object(
config["S3_BUCKET"],
_s3(idx),
_local(idx)
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ enum EncodeType {
ENCODE_TYPE_TEMPLATE = 7;
ENCODE_TYPE_NONE = 8;
ENCODE_TYPE_TEXT = 9;
ENCODE_TYPE_PARQUET = 10;
}

enum RowFormatType {
Expand Down
12 changes: 11 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,12 @@ pub trait FromArrow {
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
Timestamp(Microsecond, _) => {
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
}
Expand Down Expand Up @@ -628,6 +631,13 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampus_some_array(
&self,
array: &arrow_array::TimestampMicrosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_interval_array(
&self,
array: &arrow_array::IntervalMonthDayNanoArray,
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ opendal = { workspace = true, features = [
] }
openssl = "0.10"
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = "1"
pg_bigdecimal = { git = "https:/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::ArrayError;
use risingwave_common::error::def_anyhow_newtype;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -41,17 +42,20 @@ def_anyhow_newtype! {
url::ParseError => "failed to parse url",
serde_json::Error => "failed to parse json",
csv::Error => "failed to parse csv",

uuid::Error => transparent, // believed to be self-explanatory

// Connector errors
opendal::Error => transparent, // believed to be self-explanatory

parquet::errors::ParquetError => transparent,
ArrayError => "Array error",
sqlx::Error => transparent, // believed to be self-explanatory
mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
apache_avro::Error => "Avro error",
rdkafka::error::KafkaError => "Kafka error",
pulsar::Error => "Pulsar error",

async_nats::jetstream::consumer::StreamError => "Nats error",
async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
async_nats::jetstream::context::CreateStreamError => "Nats error",
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub use debezium::*;
use futures::{Future, TryFutureExt};
use futures_async_stream::try_stream;
pub use json_parser::*;
pub use parquet_parser::ParquetParser;
pub use protobuf::*;
use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::bail;
Expand Down Expand Up @@ -76,6 +77,7 @@ mod debezium;
mod json_parser;
mod maxwell;
mod mysql;
pub mod parquet_parser;
pub mod plain_parser;
mod postgres;

Expand Down Expand Up @@ -1117,6 +1119,7 @@ pub enum EncodingProperties {
Json(JsonProperties),
MongoJson,
Bytes(BytesProperties),
Parquet,
Native,
/// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
None,
Expand Down Expand Up @@ -1170,6 +1173,7 @@ impl SpecificParserConfig {
delimiter: info.csv_delimiter as u8,
has_header: info.csv_has_header,
}),
(SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
(SourceFormat::Plain, SourceEncode::Avro)
| (SourceFormat::Upsert, SourceEncode::Avro) => {
let mut config = AvroProperties {
Expand Down
Loading
Loading