Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): introduce file sink in PARQUET format #17311

Merged
merged 86 commits into from
Aug 22, 2024
Merged

feat(sink): introduce file sink in PARQUET format #17311

merged 86 commits into from
Aug 22, 2024

Conversation

wcy-fdu
Copy link
Contributor

@wcy-fdu wcy-fdu commented Jun 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

close #15431

This pr introduce basic file sink, which allow RisingWave sink data into s3 or other file system(currently s3 and gcs) in parquet format.
As sink decouple is not enabled yet, we will force write files every time checkpoint barrier arrives, that is to say, between two checkpoint barrier, there will be parallelism files written. To distinguish written files, the current naming convention is epoch_executor_id.suffix.

After this pr merged, we will introduce file sink batching strategy according to specific request, that is, enable sink decoupling. In addition, more sink file types will be introduced.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

To be edited(after all comments are resolved)

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@wcy-fdu wcy-fdu requested a review from a team as a code owner June 18, 2024 08:33
@wcy-fdu wcy-fdu marked this pull request as draft June 18, 2024 08:33
@wcy-fdu wcy-fdu marked this pull request as ready for review July 3, 2024 10:19
@wcy-fdu wcy-fdu requested review from xxhZs, hzxa21 and wenym1 July 3, 2024 10:19
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is the TODO specified in the PR description done?

proto/connector_service.proto Outdated Show resolved Hide resolved
src/connector/src/sink/encoder/mod.rs Outdated Show resolved Hide resolved
src/connector/src/sink/formatter/mod.rs Outdated Show resolved Hide resolved
src/connector/src/sink/mod.rs Outdated Show resolved Hide resolved
src/connector/Cargo.toml Outdated Show resolved Hide resolved
src/connector/src/sink/file_sink/mod.rs Outdated Show resolved Hide resolved
Comment on lines 189 to 191
let filters =
chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
chunk.set_visibility(filters);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, this seems to implictly convert a retractable stream chunk into an append-only stream chunk. Is it possible to receive a stream chunk with op != Op::Insert here? If yes, I feel that we should give an error, ask user to set force_append_only, and we only apply this filtering when the option is set. If not, we should add an assertion here instead of filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently for file sink, user need to explicitly support force_append_only=true.

let mut schema_fields = HashMap::new();
rw_schema.fields.iter().for_each(|field| {
let res = schema_fields.insert(&field.name, &field.data_type);
// This assert is to make sure there is no duplicate field name in the schema.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make sure this in validate?

Copy link
Contributor Author

@wcy-fdu wcy-fdu Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think validate is only used to verify whether a connection can be established with the downstream sink. For example, for a file sink, it is used to verify whether the file can be written normally.
The rw_schema here is defined by the user's create sink statement and should be verified on the frontend to some extent, while I notice that other sinks also just get the schema directly from param without doing any validation, so I'd like to maintain the current implementation. Feel free to leave your stronger reasons.

src/connector/src/sink/file_sink/mod.rs Outdated Show resolved Hide resolved
@hzxa21
Copy link
Collaborator

hzxa21 commented Jul 4, 2024

It seems that there are many conflicts. I suggest we resolve the conflicts first.

@wcy-fdu
Copy link
Contributor Author

wcy-fdu commented Jul 4, 2024

Also, is the TODO specified in the PR description done?

Not yet, I will add ci/e2e test after parquet source is merged.

@tabVersion tabVersion self-requested a review July 8, 2024 06:11
@wcy-fdu
Copy link
Contributor Author

wcy-fdu commented Aug 15, 2024

debug logging:

I think the CN exit with code 139 is related to the newly added code:

                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>  },
                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>  },

If any of these three sinks appear alone, there will be no problem, but if there are more than two, 139 will appear.
I have reason to suspect that the problem is caused by the FileSink struct and its associated type OpendalSinkBackend, but I have no clue at the moment.

Maybe there is a shared mutable state between multiple Sinks, or maybe it’s an initialization problem.

@wcy-fdu
Copy link
Contributor Author

wcy-fdu commented Aug 21, 2024

Thank @wenym1 very much, finally we resolved the mysterious 139 (segmentation fault) issue, let me briefly summarize what we did and our suspicions.

The issue mainly occurred with the dispatch_sink! macro. Although this PR only added three types of file sinks, I suspect that the macro expansion has become quite large, meaning it may be on the edge of a stack overflow. I have this suspicion because even when I individually added Box to one or multiple fields in the newly added FileSink struct, it did not resolve the issue, so it may not be due to the FileSink itself.

Here, let's take a look at the key changes.

  • This code will generate a 139 segment fault
dispatch_sink!(self.sink, sink, {
    let consume_log_stream = Self::execute_consume_log(
        *sink,
        log_reader,
        self.input_columns,
        self.sink_param,
        self.sink_writer_param,
        self.actor_context,
    )
    .instrument_await(format!("consume_log (sink_id {sink_id})"))
    .map_ok(|never| match never {}); // unify return type to `Message`

    select(consume_log_stream.into_stream(), write_log_stream).boxed()
})
  • This code will not generate a 139 segment fault
let consume_log_stream_future = dispatch_sink!(self.sink, sink, {
    let consume_log_stream = Self::execute_consume_log(
        *sink,
        log_reader,
        self.input_columns,
        self.sink_param,
        self.sink_writer_param,
        self.actor_context,
    )
    .instrument_await(format!("consume_log (sink_id {sink_id})"))
    .map_ok(|never| match never {}); // unify return type to `Message`

    consume_log_stream.boxed()
});
select(consume_log_stream_future.into_stream(), write_log_stream)

As you can see that the difference lies essentially between select(A, B).boxed() and select(A.boxed(), B.boxed()). Initially, I thought the performance impact of the two would be similar. However, since A is a macro expansion, one possible explanation is that the former generates n selects, while boxing the macro first results in only one select. As the content of the macro(n) grows larger, the former leads to a segment fault.

Note that this is just a suspicion, despite this, I still don't fully understand

  1. why a segmentation fault occurs during jdbc.load (perhaps the communication with jni at this time causes a surge in memory)
  2. why a single file sink will not cause segment fault, but two or more will. Is it really a coincidence that the two of them touched the critical point?

Anyways, we found a solution and developed a methodology. The common causes of segmentation faults typically include

  • Stack Overflow:
    Due to deep recursive calls or allocating large data structures on the stack.
  • Uninitialized Memory:
    Accessing uninitialized variables or memory, leading to reading invalid addresses.
  • Unsafe Code:
    Errors in dereferencing pointers or accessing invalid memory within unsafe blocks.
  • Data Races:
    Improper synchronization of shared data access in a multithreaded environment.
  • External Library Issues:
    Bugs in third-party libraries that cause memory access errors.
  • Circular References:
    Circular references in data structures may lead to memory leaks and stack overflow.

While if a rust system encounters a 139 segmentation fault, it is likely due to some allocations on the stack being too large. In such cases, wrapping potentially problematic fields with Box may be a way to address the issue.

@BugenZhao
Copy link
Member

While if a rust system encounters a 139 segmentation fault, it is likely due to some allocations on the stack being too large.

You can attach a debugger to the process and it will show the reason for any exceptions (like segmentation fault). Typically it's stack overflow, and by checking the backtrace we can find where it happens.

@wcy-fdu
Copy link
Contributor Author

wcy-fdu commented Aug 22, 2024

While if a rust system encounters a 139 segmentation fault, it is likely due to some allocations on the stack being too large.

You can attach a debugger to the process and it will show the reason for any exceptions (like segmentation fault). Typically it's stack overflow, and by checking the backtrace we can find where it happens.

Thank you for your explanation! Can we attach debugger in out CI? This issue can not reproduce locally, only happen in CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: support S3 sink
5 participants