Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): unified message parser #10096

Merged
merged 34 commits into from
Jun 15, 2023
Merged

refactor(source): unified message parser #10096

merged 34 commits into from
Jun 15, 2023

Conversation

algosday
Copy link
Contributor

@algosday algosday commented May 31, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

We have supported many combinations in our source connector, such as:

  • avro,json + kafka-upsert/append-only
  • avro,json + debezium

The avro, json is the parsing format, which indicates how the string/binary message is mapping to the object.

The upsert/append-only/debezium indicates how the parsed object is mapping to the ChangeEvent ( insert, update or delete).

In current implamentation, we almost have a dedicated parser for each combination, handing from format parsing to rows changing ( insert/delete/update rows). It's hard to maintain them.

This PR refactors the source parser carefully, splits into two layers.

  • Access Layer. Provides an abstract which can access the deep nested value by path directly without parsing things ahead. No matter how messages are encoded, delay the actual parsing stuff until it is accessed.
    • AvroAccess, JsonAccess. Perform the actual parsing operations.
  • CDC Layer. This abstraction facilitates the mapping of the messages to the row operations, regardless of the encoding or structure of the messages. (debezium/kafka-upsert...)

And some adapters to combine them together.

Checklist For Contributors

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • I have demonstrated that backward compatibility is not broken by breaking changes and created issues to track deprecated features to be removed in the future. (Please refer to the issue)
  • All checks passed in ./risedev check (or alias, ./risedev c)

Checklist For Reviewers

  • I have requested macro/micro-benchmarks as this PR can affect performance substantially, and the results are shown.

Documentation

  • My PR DOES NOT contain user-facing changes.
Click here for Documentation

Types of user-facing changes

Please keep the types that apply to your changes, and remove the others.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

@algosday algosday marked this pull request as draft May 31, 2023 06:18
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 3547 files.

Valid Invalid Ignored Fixed
1575 3 1969 0
Click to see the invalid file list
  • src/connector/src/parser/unified/avro.rs
  • src/connector/src/parser/unified/json.rs
  • src/connector/src/parser/unified/mod.rs

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 3547 files.

Valid Invalid Ignored Fixed
1575 3 1969 0
Click to see the invalid file list
  • src/connector/src/parser/unified/avro.rs
  • src/connector/src/parser/unified/json.rs
  • src/connector/src/parser/unified/mod.rs

src/connector/src/parser/unified/avro.rs Show resolved Hide resolved
src/connector/src/parser/unified/json.rs Show resolved Hide resolved
src/connector/src/parser/unified/mod.rs Show resolved Hide resolved
@algosday algosday marked this pull request as ready for review May 31, 2023 06:33
@algosday algosday marked this pull request as draft May 31, 2023 06:33
@algosday algosday marked this pull request as ready for review June 1, 2023 07:47
@fuyufjh
Copy link
Member

fuyufjh commented Jun 5, 2023

Can you please improve the PR title & description and add more comments in your code?

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, please run ./risedev check and fix the issues

use crate::parser::schema_registry::{extract_schema_id, Client};
use crate::parser::schema_resolver::ConfluentSchemaResolver;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::debezium::DebeziumAdapter;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::util::get_kafka_topic;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::{SourceColumnDesc, SourceContextRef};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 46:

// TODO: avoid duplicated codes with `AvroParser`

Is this completed in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I found the duplicated codes for parsing is done in this PR. But there are still some duplicated codes for handling schema fetching.

src/connector/src/parser/unified/mod.rs Outdated Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some comments in the code according to my understanding. Please correct me if wrong.

@fuyufjh fuyufjh changed the title unified refactor(source): unified message parser Jun 5, 2023
@fuyufjh
Copy link
Member

fuyufjh commented Jun 5, 2023

Can you please improve the PR title & description and add more comments in your code?

Please be careful when using Copilot. For this PR I think the most critical information for reviewers is what "unified" means, which is not pointed out by Copilot.

src/connector/src/parser/unified/mod.rs Outdated Show resolved Hide resolved
@@ -263,7 +294,7 @@ mod tests {
async fn test_json_parse_object_top_level() {
test_json_parser(get_payload).await;
}

#[ignore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please leave a todo and reopen it in future

src/connector/src/parser/avro/parser.rs Outdated Show resolved Hide resolved
src/connector/src/parser/unified/json.rs Outdated Show resolved Hide resolved
src/connector/src/parser/unified/json.rs Show resolved Hide resolved
src/connector/src/parser/unified/json.rs Show resolved Hide resolved
src/connector/src/parser/unified/json.rs Show resolved Hide resolved
@algosday
Copy link
Contributor Author

algosday commented Jun 6, 2023

Can you please improve the PR title & description and add more comments in your code?

Please be careful when using Copilot. For this PR I think the most critical information for reviewers is what "unified" means, which is not pointed out by Copilot.

I have writing the description by hand. Explain the motivation and the design.

@codecov
Copy link

codecov bot commented Jun 12, 2023

Codecov Report

Merging #10096 (1b49c15) into main (5cf94c9) will increase coverage by 0.02%.
The diff coverage is 68.95%.

@@            Coverage Diff             @@
##             main   #10096      +/-   ##
==========================================
+ Coverage   70.51%   70.53%   +0.02%     
==========================================
  Files        1248     1255       +7     
  Lines      213868   213772      -96     
==========================================
- Hits       150811   150789      -22     
+ Misses      63057    62983      -74     
Flag Coverage Δ
rust 70.53% <68.95%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/connector/src/parser/common.rs 100.00% <ø> (+26.01%) ⬆️
src/connector/src/parser/debezium/avro_parser.rs 41.41% <0.00%> (+4.47%) ⬆️
src/connector/src/parser/mod.rs 38.84% <ø> (-5.39%) ⬇️
src/connector/src/parser/unified/avro.rs 52.90% <52.90%> (ø)
src/connector/src/parser/avro/parser.rs 64.46% <58.33%> (+5.52%) ⬆️
src/connector/src/parser/canal/simd_json_parser.rs 83.72% <58.82%> (+4.16%) ⬆️
src/connector/src/parser/unified/maxwell.rs 63.15% <63.15%> (ø)
src/connector/src/parser/unified/util.rs 65.21% <65.21%> (ø)
src/connector/src/parser/unified/debezium.rs 65.90% <65.90%> (ø)
src/connector/src/parser/unified/json.rs 72.35% <72.35%> (ø)
... and 7 more

... and 7 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@github-actions github-actions bot added type/refactor user-facing-changes Contains changes that are visible to users labels Jun 15, 2023
@tabVersion tabVersion removed the user-facing-changes Contains changes that are visible to users label Jun 15, 2023
@github-actions github-actions bot added the user-facing-changes Contains changes that are visible to users label Jun 15, 2023
src/connector/Cargo.toml Show resolved Hide resolved
src/connector/src/parser/unified/upsert.rs Outdated Show resolved Hide resolved
src/connector/src/parser/avro/parser.rs Outdated Show resolved Hide resolved
@@ -339,15 +278,15 @@ mod tests {
let rust_decimal = avro_decimal_to_rust_decimal(avro_decimal, 28, 1).unwrap();
assert_eq!(rust_decimal, rust_decimal::Decimal::try_from(28.1).unwrap());
}

#[ignore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ignore here

src/connector/src/parser/canal/simd_json_parser.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge first. Let's pay attention to the feedback.

@tabVersion tabVersion mentioned this pull request Jun 15, 2023
@algosday algosday added this pull request to the merge queue Jun 15, 2023
Merged via the queue into main with commit 9d83f88 Jun 15, 2023
@algosday algosday deleted the idx0dev/sourceng branch June 15, 2023 11:07
Little-Wallace added a commit that referenced this pull request Jun 20, 2023
commit ec637af4f5458b1a951d591a3dd7fc6994192e8f
Author: Little-Wallace <[email protected]>
Date:   Tue Jun 20 12:52:47 2023 +0800

    fix config

    Signed-off-by: Little-Wallace <[email protected]>

commit 14641c2
Author: Little-Wallace <[email protected]>
Date:   Mon Jun 19 20:47:43 2023 +0800

    fix config

    Signed-off-by: Little-Wallace <[email protected]>

commit bc252ee
Author: Little-Wallace <[email protected]>
Date:   Mon Jun 19 20:10:51 2023 +0800

    fix busy loop

    Signed-off-by: Little-Wallace <[email protected]>

commit 5b816a6
Merge: 1059c15 02dfee5
Author: Wallace <[email protected]>
Date:   Mon Jun 19 13:59:04 2023 +0800

    Merge branch 'main' into scheduler-split

commit 02dfee5
Author: William Wen <[email protected]>
Date:   Mon Jun 19 13:52:03 2023 +0800

    feat(log-store): implement a merge stream of kv-log-store (#10090)

commit a6c9c39
Author: lmatz <[email protected]>
Date:   Mon Jun 19 13:28:28 2023 +0800

    chore: use github action to auto cherry pick pr to release branch (#10383)

commit 608e183
Author: Bohan Zhang <[email protected]>
Date:   Mon Jun 19 12:18:28 2023 +0800

    fix: support variable scale decimal in avro (#10368)

    Co-authored-by: idx0-dev <[email protected]>

commit 75f6025
Author: zwang28 <[email protected]>
Date:   Sun Jun 18 17:41:15 2023 +0800

    feat(trace): enable await tree trace for compactor (#10381)

commit 321d376
Author: wu <[email protected]>
Date:   Sun Jun 18 15:59:38 2023 +0800

    feat(connector): sink support for elasticsearch (#10357)

commit d13d862
Author: Eric Fu <[email protected]>
Date:   Sun Jun 18 00:26:47 2023 +0800

    feat: add debug profile tools in docker image (#10380)

commit 1059c15
Merge: 9ac9ed4 d26f4bb
Author: Wallace <[email protected]>
Date:   Fri Jun 16 20:49:21 2023 +0800

    Merge branch 'main' into scheduler-split

commit d26f4bb
Author: Yuhao Su <[email protected]>
Date:   Fri Jun 16 18:36:27 2023 +0800

    feat(metrics): add metrics for the evicted watermark for each executors (#10379)

commit 3dd1393
Author: William Wen <[email protected]>
Date:   Fri Jun 16 17:34:34 2023 +0800

    feat(sink): enable delta lake sink (#10374)

commit 9ac9ed4
Merge: 58d8562 5c6b25c
Author: Wallace <[email protected]>
Date:   Fri Jun 16 17:08:38 2023 +0800

    Merge branch 'main' into scheduler-split

commit 7b66d55
Author: William Wen <[email protected]>
Date:   Fri Jun 16 16:49:57 2023 +0800

    fix(docker): install sasl library in docker (#10365)

    Co-authored-by: Eric Fu <[email protected]>

commit 5c6b25c
Author: zwang28 <[email protected]>
Date:   Fri Jun 16 16:10:23 2023 +0800

    feat(ctl): list serving fragment mappings (#10331)

commit 2c2a2b7
Author: Renjie Liu <[email protected]>
Date:   Fri Jun 16 15:49:00 2023 +0800

    fix: Memory counter leak (#10358)

commit 1c1354c
Author: lmatz <[email protected]>
Date:   Fri Jun 16 15:36:00 2023 +0800

    chore: return a warning message when creating sink with order by (#10239)

commit 558cef5
Author: zwang28 <[email protected]>
Date:   Fri Jun 16 13:55:08 2023 +0800

    feat(frontend): support mask failed serving worker temporarily (#10328)

commit 7dccfa3
Author: Bohan Zhang <[email protected]>
Date:   Fri Jun 16 13:03:21 2023 +0800

    chore: fix kafka download path in risedev (#10363)

commit 58d8562
Author: Little-Wallace <[email protected]>
Date:   Fri Jun 16 12:53:47 2023 +0800

    fix config test

    Signed-off-by: Little-Wallace <[email protected]>

commit e77b76b
Author: Little-Wallace <[email protected]>
Date:   Fri Jun 16 12:21:35 2023 +0800

    fix space reclaim miss

    Signed-off-by: Little-Wallace <[email protected]>

commit 2e5a907
Author: Little-Wallace <[email protected]>
Date:   Fri Jun 16 11:10:19 2023 +0800

    merge conflict

    Signed-off-by: Little-Wallace <[email protected]>

commit 1af4ea1
Author: Little-Wallace <[email protected]>
Date:   Wed Jun 14 16:41:54 2023 +0800

    do not check table size for large throughput

    Signed-off-by: Little-Wallace <[email protected]>

commit ccc47a2
Merge: 35199c4 9d83f88
Author: Wallace <[email protected]>
Date:   Fri Jun 16 10:59:19 2023 +0800

    Merge branch 'main' into scheduler-split

commit 9d83f88
Author: idx0-dev <[email protected]>
Date:   Thu Jun 15 18:39:25 2023 +0800

    refactor(source): unified message parser (#10096)

    Co-authored-by: Eric Fu <[email protected]>
    Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

commit 171e212
Author: William Wen <[email protected]>
Date:   Thu Jun 15 16:35:33 2023 +0800

    feat(pinot-demo): add demo for sink to pinot via kafka (#10294)

commit 11d3092
Author: William Wen <[email protected]>
Date:   Thu Jun 15 16:32:54 2023 +0800

    feat(java-binding): bundle jni library to jar (#10229)

commit 56f4011
Author: Yuhao Su <[email protected]>
Date:   Thu Jun 15 16:27:28 2023 +0800

    feat(metrics): add memory usage metrics for more executor (#10351)

commit ea7f95b
Author: Yuanxin Cao <[email protected]>
Date:   Thu Jun 15 15:30:31 2023 +0800

    refactor(sink): prune out hidden columns within sink executor (#10276)

commit d818a00
Author: Tesla Zhang‮ <[email protected]>
Date:   Thu Jun 15 02:58:47 2023 -0400

    refactor(plan_node_fmt): 6 more impls for Distill, refactor all `columns_name` functions (#10344)

commit 26750c9
Author: xxchan <[email protected]>
Date:   Thu Jun 15 08:34:38 2023 +0200

    build: use debug=1 back for release (#10345)

commit ca41717
Author: Renjie Liu <[email protected]>
Date:   Thu Jun 15 14:28:25 2023 +0800

    fix: Batch memory maybe negative (#10338)

commit d95d3a2
Author: zwang28 <[email protected]>
Date:   Thu Jun 15 14:09:33 2023 +0800

    chore(metric): add metric for hummock full GC (#10264)

commit 65f05dd
Author: StrikeW <[email protected]>
Date:   Thu Jun 15 13:10:06 2023 +0800

    test(integration-test): jdbc sink data type tests (#10202)

commit a164ab7
Author: xxchan <[email protected]>
Date:   Thu Jun 15 06:06:18 2023 +0200

    chore: bump typos version and fix typos (#10342)

commit 5cf94c9
Author: xxchan <[email protected]>
Date:   Wed Jun 14 16:18:53 2023 +0200

    feat: support scalar function in FROM clause (#10317)

commit 9593d1b
Author: Tesla Zhang‮ <[email protected]>
Date:   Wed Jun 14 08:40:29 2023 -0400

    refactor(plan_node_fmt): 4 more impls for Distill (#10296)

commit 5b38239
Author: xxchan <[email protected]>
Date:   Wed Jun 14 13:20:21 2023 +0200

    fix: replace ouroboros with self_cell (#10316)

commit 90ee868
Author: Xinjing Hu <[email protected]>
Date:   Wed Jun 14 19:00:27 2023 +0800

    feat(expr, agg): support `PERCENTILE_CONT`, `PERCENTILE_DISC` and `MODE` aggregation (#10252)

    Signed-off-by: Richard Chien <[email protected]>
    Co-authored-by: Richard Chien <[email protected]>
    Co-authored-by: Noel Kwan <[email protected]>

commit e3fe51b
Author: congyi wang <[email protected]>
Date:   Wed Jun 14 17:41:39 2023 +0800

    refactor(log): change `aws_credential_types::cache::lazy_caching` log level to WARN (#10333)

commit 33694b1
Author: stonepage <[email protected]>
Date:   Wed Jun 14 17:11:22 2023 +0800

    refactor(binder): bind create table (#10307)

commit 02a110c
Author: Noel Kwan <[email protected]>
Date:   Wed Jun 14 16:18:10 2023 +0800

    feat(storage): support replicated `LocalHummockStorage` (#10226)

commit ede3278
Author: Richard Chien <[email protected]>
Date:   Wed Jun 14 16:02:34 2023 +0800

    refactor(common): add `MemcmpEncoded` struct to represent memcmp encoded data (#10319)

    Signed-off-by: Richard Chien <[email protected]>

commit ff91a4a
Author: Li0k <[email protected]>
Date:   Wed Jun 14 15:56:21 2023 +0800

    refactor(storage): refactor hummock timer loop (#10164)

commit 353da76
Author: Richard Chien <[email protected]>
Date:   Wed Jun 14 15:06:07 2023 +0800

    fix(macro): support `derive(EstimateSize)` on tuple struct (#10318)

    Signed-off-by: Richard Chien <[email protected]>
    Co-authored-by: Yuhao Su <[email protected]>

commit 7dd388b
Author: Runji Wang <[email protected]>
Date:   Wed Jun 14 14:52:48 2023 +0800

    doc(udf): document Java UDF (#10320)

commit e4aec8b
Author: xiangjinwu <[email protected]>
Date:   Wed Jun 14 14:15:36 2023 +0800

    feat(binder): support `group by` output alias or index (#10305)

commit 8eb0e43
Author: Huangjw <[email protected]>
Date:   Wed Jun 14 11:01:28 2023 +0800

    fix(ci): fix release script (#10325)

commit 86f734c
Author: Shanicky Chen <[email protected]>
Date:   Wed Jun 14 03:45:42 2023 +0800

    fix: Increase timeout for end-to-end test (parallel) (dev mode) (#10308)

    Co-authored-by: xxchan <[email protected]>

commit e02ef6c
Author: Eric Fu <[email protected]>
Date:   Wed Jun 14 03:42:56 2023 +0800

    fix: jemalloc profiling (#10314)

    Co-authored-by: xxchan <[email protected]>

commit 07f6b52
Author: xxchan <[email protected]>
Date:   Tue Jun 13 21:31:21 2023 +0200

    fix: use alias as table function's column name (#10311)

commit 3017aa2
Author: xxchan <[email protected]>
Date:   Tue Jun 13 20:58:37 2023 +0200

    ci: download dependencies from s3 (#9782)

commit f971965
Author: zwang28 <[email protected]>
Date:   Tue Jun 13 19:24:45 2023 +0800

    refactor(batch): maintain serving vnode mapping in meta node (#10004)

commit 2b2950d
Author: Zhanxiang (Patrick) Huang <[email protected]>
Date:   Tue Jun 13 19:07:40 2023 +0800

    refactor: replace minstant/minitrace with tokio instant/tracing (#10302)

commit 9177034
Author: congyi wang <[email protected]>
Date:   Tue Jun 13 18:08:51 2023 +0800

    feat(metrics): monitor s3 sdk retry (#9790)

commit 16a0efc
Author: Runji Wang <[email protected]>
Date:   Tue Jun 13 17:58:34 2023 +0800

    feat(udf): Java UDF SDK (#10095)

commit 2b2ea49
Author: Eric Fu <[email protected]>
Date:   Tue Jun 13 17:19:35 2023 +0800

    fix(metrics): incorrect FP rate (#10300)

commit 54c660b
Author: lmatz <[email protected]>
Date:   Tue Jun 13 16:57:07 2023 +0800

    chore: remove enable_stream_row_count config (#10261)

commit a6f38d9
Author: Shanicky Chen <[email protected]>
Date:   Tue Jun 13 16:30:40 2023 +0800

    feat: Add revision for rescheduling process (#10199)

    Signed-off-by: Shanicky Chen <[email protected]>

Signed-off-by: Little-Wallace <[email protected]>
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks so exciting to me!

Comment on lines +72 to +74
let mut primary_key = msg.primary_key.to_vec();
let mut record = msg.record.to_vec();
let key_decoded = simd_json::to_borrowed_value(&mut primary_key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using Cow::to_mut here?

Comment on lines +61 to +66
impl<K, V> Access for UpsertChangeEvent<K, V>
where
K: Access,
V: Access,
{
fn access(&self, path: &[&str], type_expected: Option<&DataType>) -> super::AccessResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this implementation is not actually being used. Can we splitting it into access_key and access_value.

) -> std::result::Result<WriteGuard, RwError> {
match row_op.op()? {
super::ChangeEventOperation::Upsert => writer.insert(|column| {
let res = row_op.access_field(&column.name, &column.data_type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some change log formats, it seems we have to call op() again in access_field. Can we avoid this duplicated call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, since we always treat Update as Upsert, the before of a Debezium record is always dropped here. I'm not sure whether this matters as we're not respecting the retractable property of the original change log. cc @st1page @tabVersion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/connector type/refactor user-facing-changes Contains changes that are visible to users 📖✗ No user documentation is needed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants