Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support batch s3 parquet frontend part #17625

Merged
merged 10 commits into from
Jul 12, 2024

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented Jul 9, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Related issue: Feat: Batch ingest iceberg/file source #14742
  • Support the frontend part of batch s3 parquet file table function. select * from file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_localtion). First, the planner will create a LogicalTableFunction. Secondly, LogicalTableFunction would be transformed into LogiacalFileScan by TableFunctionToFileScanRule. Finally, LogicalFileScan would be transformed into a BatchFileScan. To avoid the PR from being too large, the batch fragmentation and scheduler part would be implemented in another PR later.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@chenzl25 chenzl25 requested a review from a team as a code owner July 9, 2024 07:03
Comment on lines +135 to +157
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let parquet_stream_builder = create_parquet_stream_builder(
eval_args[2].clone(),
eval_args[3].clone(),
eval_args[4].clone(),
eval_args[5].clone(),
)
.await?;

let mut rw_types = vec![];
for field in parquet_stream_builder.schema().fields() {
rw_types.push((
field.name().to_string(),
IcebergArrowConvert.type_from_field(field)?,
));
}

Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
StructType::new(rw_types),
))
})
})?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Derive the schema from a parquet file in the planner.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

src/frontend/src/expr/table_function.rs Outdated Show resolved Hide resolved
@chenzl25
Copy link
Contributor Author

chenzl25 commented Jul 10, 2024

@wangrunji0408 the deterministic test reports the following error. Does it mean that we can't use block_in_place?

error[E0425]: cannot find function `block_in_place` in module `tokio::task`
  | --> src/frontend/src/expr/table_function.rs:135:26
  | \|
  | 135 \|             tokio::task::block_in_place(\|\| {
  | \|                          ^^^^^^^^^^^^^^ not found in `tokio::task`

@wangrunji0408
Copy link
Contributor

@wangrunji0408 the deterministic test reports the following error. Does it mean that we can't use block_in_place?

error[E0425]: cannot find function `block_in_place` in module `tokio::task`
  | --> src/frontend/src/expr/table_function.rs:135:26
  | \|
  | 135 \|             tokio::task::block_in_place(\|\| {
  | \|                          ^^^^^^^^^^^^^^ not found in `tokio::task`

No, we can't use block_in_place in simulation because there will be only one thread.
Why is block_in_place necessary here?

@chenzl25
Copy link
Contributor Author

chenzl25 commented Jul 10, 2024

@wangrunji0408 the deterministic test reports the following error. Does it mean that we can't use block_in_place?

error[E0425]: cannot find function `block_in_place` in module `tokio::task`
  | --> src/frontend/src/expr/table_function.rs:135:26
  | \|
  | 135 \|             tokio::task::block_in_place(\|\| {
  | \|                          ^^^^^^^^^^^^^^ not found in `tokio::task`

No, we can't use block_in_place in simulation because there will be only one thread. Why is block_in_place necessary here?

I want to call an async function and fetch its return type with a non-async function. Do you have any suggestions for achieving this goal?

@BugenZhao
Copy link
Member

I want to call an async function and fetch its return type with a non-async function.

Not sure if it's a good practice to include asynchronous logic when creating an expression. 😢

@chenzl25
Copy link
Contributor Author

I want to call an async function and fetch its return type with a non-async function.

Not sure if it's a good practice to include asynchronous logic when creating an expression. 😢

True, but I don't have a better idea, because we need to fetch data to determine the schema for a LogicalPlan while the planner can't go across await points.

Comment on lines +81 to +83
fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: Will we support column pruning on the FileSource? This might be a necessary feature for columnar storage formats like Parquet, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is doable 🤔

batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can support it later.

@chenzl25 chenzl25 added this pull request to the merge queue Jul 12, 2024
Merged via the queue into main with commit 45c9e2b Jul 12, 2024
31 of 32 checks passed
@chenzl25 chenzl25 deleted the dylan/support_batch_s3_parquet_frontend branch July 12, 2024 08:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants