Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl key encode clause for sink #16377

Merged
merged 31 commits into from
May 8, 2024
Merged

feat: impl key encode clause for sink #16377

merged 31 commits into from
May 8, 2024

Conversation

tabVersion
Copy link
Contributor

@tabVersion tabVersion commented Apr 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

as title, introducing key encode clause in sink statement.

the whole statement looks like:

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='kafka',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
    key = 'value'
) ]
[KEY ENCODE key_encode [(...)]]

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

adding the key encode at the end of the sink statement.

The key encode can only be TEXT now, and the primary key specified in SQL should be one and only one of the types (varchar, bool, small int, int, big int)

Copy link

gitguardian bot commented Apr 29, 2024

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password 3ba944b e2e_test/source/cdc/cdc.validate.postgres.slt View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

@tabVersion tabVersion marked this pull request as ready for review April 30, 2024 08:38
@tabVersion tabVersion added the user-facing-changes Contains changes that are visible to users label Apr 30, 2024
@tabVersion
Copy link
Contributor Author

wait for #16575 merge first.

@fuyufjh fuyufjh self-requested a review May 7, 2024 03:40
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we expand the cartesian product of

{Append-only, Upsert} x {Text (key) or not} x {Json, Proto, Template}

Is it possible to separate them?

Copy link
Contributor

@xiangjinwu xiangjinwu May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly. The caller of formatter does not need to know whether it is AvroEncoder or JsonEncoder, but it still need to know whether the output of an encoder SerTo a (utf8) String or not (Vec<u8>). So it may be possible to reduce into {Append-only, Upsert} x {String, Vec<u8>} x {String, Vec<u8>} (or possibly {Append-only, Upsert} x {String, Vec<u8>} x {Vec<u8>} if all MQs allow payload to be non-utf8 bytes). But the refactor wont fit into this PR. I will give it a try after this. #16628

src/frontend/src/optimizer/plan_node/stream_sink.rs Outdated Show resolved Hide resolved
| DataType::Int16
| DataType::Int32
| DataType::Int64 => {}
_ => {
Copy link
Member

@fuyufjh fuyufjh May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please explicitly exhaust all data types here. It helps to you to think about all the data types, and also reminds the other people to maintain this code when new data type is introduced.

And I think many data types are missing, such as Decimal, Int256, Timestamp, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to #16377 (comment) for a list of all data types, and why we want to be conservative on some data types initially.

src/connector/src/sink/formatter/mod.rs Outdated Show resolved Hide resolved
(F::AppendOnly, E::Json, Some(E::Text)) => {
Impl::AppendOnlyTextJson(build(p).await?)
}
(F::AppendOnly, E::Json, None | Some(_)) => Impl::AppendOnlyJson(build(p).await?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some(_) shall not be ignored silently, although it probably already checked in frontend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, I only keep (F::AppendOnly, E::Json, None) for append-only json. Other key encode config is invalid and should be rejected (although it is done in the frontend).

src/connector/src/sink/encoder/text.rs Outdated Show resolved Hide resolved
src/connector/src/sink/encoder/text.rs Outdated Show resolved Hide resolved
Comment on lines 27 to 28
pub fn new(schema: Schema, col_indices: Option<Vec<usize>>) -> Self {
Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to restrict the data type here:

  • allowed: bool, int16, int32, int64, varchar, serial, int256
  • denied
  • maybe
    • decimal
      • 1.0 and 1.00 are equal as decimal but unequal as text
    • date
      • -0001-01-01 is same as 0002-01-01 BC
    • time, timestamptz
      • 12:34:56.020 is same as 12:34:56.02 (i.e. pad to 3/6/9 or not)
    • timestamp
      • 2024-01-01 12:34:56 is neither ISO 8601 nor RFC 3339 but a mixture of both.

src/sqlparser/src/ast/statement.rs Outdated Show resolved Hide resolved
src/sqlparser/src/ast/statement.rs Outdated Show resolved Hide resolved
src/sqlparser/src/ast/statement.rs Outdated Show resolved Hide resolved
Comment on lines 326 to 327
if let Some(format_desc) = &format_desc
&& let Some(key_encode) = &format_desc.key_encode
Copy link
Contributor

@xiangjinwu xiangjinwu May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encoder-specific check in EncoderBuild::build() and ::new(). This file can be untouched.

The differences between new and build:

  • New is supposed to be the minimal parameter required by the encoder itself. So it shall accept a single col_index: usize.
  • Build is supposed to adapt the common input to the ones required by new. So it checks whether pk_indices contains exactly one column.

primary_key = 'id')
format plain encode json (
force_append_only='true'
) key encode text ;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ingest it back with include key as and compare it with expected value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already included in the test.

Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
e2e_test/sink/kafka/create_sink.slt Outdated Show resolved Hide resolved
src/sqlparser/src/ast/legacy_source.rs Outdated Show resolved Hide resolved
src/sqlparser/src/ast/statement.rs Outdated Show resolved Hide resolved
src/sqlparser/src/parser.rs Outdated Show resolved Hide resolved
src/connector/src/sink/formatter/mod.rs Outdated Show resolved Hide resolved
src/connector/src/sink/formatter/mod.rs Outdated Show resolved Hide resolved
Comment on lines 51 to 52
let datum = row.datum_at(col_index);
result = datum.to_text_with_type(&self.schema.fields[col_index].data_type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second point of #16377 (comment) unresolved:

  • Let's make an exception for bool to return true/false rather than t/f used by PostgreSQL ToText. (Similarly in rw-expr and pg-proc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I thought it is to_text_with_type 's default behavior. Let me fix.

Canal,
// Keyword::UPSERT
Upsert,
// Keyword::PLAIN
Copy link
Member

@fuyufjh fuyufjh May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended? Or just a formatter issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a RustRover's bug, it has its own formatter instead of using the rust toolchain one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember there is an option like "use reustfmt instead of the built-in formatter". Btw, don't forget to fix this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed already

src/sqlparser/src/ast/legacy_source.rs Outdated Show resolved Hide resolved
src/sqlparser/src/ast/statement.rs Outdated Show resolved Hide resolved
Signed-off-by: tabVersion <[email protected]>
Copy link
Contributor

@xiangjinwu xiangjinwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

let data_type = &self.schema.fields[col_index].data_type;
if data_type == &DataType::Boolean {
result = datum
.unwrap_or(ScalarRefImpl::Bool(false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point ... I do not feel outputting false is right when the input is null. I just checked ToText for Datum and it is outputting NULL. Maybe keep consistent here.

As for PostgreSQL, there is no text or binary format for null. It is always indicated by a side channel.

  • The typoutput procedure for each pg_type always return null::varchar.
  • In psql it is empty string by default but customizable via \pset null.
  • The typoutput of an array element (rather than the whole array) outputs NULL.
  • The typoutput of a record field (rather than the whole record) outputs empty string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. And thanks for explaining the pg behavior. Then why we make an exception for the boolean type? Directly using the ToText trait meets the behavior above and is consistent with other types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we make an exception for the boolean type

PostgreSQL ToText PostgreSQL v::varchar Popular in other programming languages TextEncoder
bool t/f true/false true/false follow v::varchar
list {1,2,3} {1,2,3} [1,2,3] disallowed
int 49 49 49 49 (rather than 0x31)

So actually we only allowed unambiguous data types 😂 Or we could say PostgreSQL also made an exception for bool in v::varchar which we follow, while it is unfortunate there is no such trait ...

src/connector/src/sink/formatter/mod.rs Outdated Show resolved Hide resolved
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Canal,
// Keyword::UPSERT
Upsert,
// Keyword::PLAIN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember there is an option like "use reustfmt instead of the built-in formatter". Btw, don't forget to fix this

@tabVersion tabVersion added this pull request to the merge queue May 8, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks May 8, 2024
@tabVersion tabVersion enabled auto-merge May 8, 2024 21:32
@tabVersion tabVersion added this pull request to the merge queue May 8, 2024
Merged via the queue into main with commit 926ba1b May 8, 2024
27 of 30 checks passed
@tabVersion tabVersion deleted the tab/key-encode branch May 8, 2024 22:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants