Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): encode json option timestamptz.handling.mode #16265

Merged
merged 3 commits into from
Apr 12, 2024

Conversation

xiangjinwu
Copy link
Contributor

@xiangjinwu xiangjinwu commented Apr 11, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Similar to sink, we allow specifying timestamptz number units:

  • micro
  • milli
  • guess_number_unit: This has been the default, but limits the range to [1973-03-03 09:46:40, 5138-11-16 09:46:40)
  • utc_string: This format is least ambiguous and can mostly be inferred correctly without the need to be specified.
  • utc_without_suffix: This allows the user to acknowledge naive timestamp is in UTC rather than local time.

Note the corresponding format has to be one of plain | upsert | debezium. The following formats do NOT accept this option as of now:

  • debezium_mongo
  • canal
  • maxwell

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

https://docs.risingwave.com/docs/current/supported-sources-and-formats/#debezium-json

When using format plain | upsert | debezium encode json (but not format debezium_mongo | canal | maxwell encode json), there is a new option timestamptz.handling.mode. There are 5 possible choices. See description above for the meaning of each one.

@@ -66,7 +67,7 @@ impl PlainParser {
};

let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
DebeziumJsonAccessBuilder::new()?,
DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StrikeW Why do we have DebeziumJsonAccessBuilder as part of PlainParser?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared cdc source use the plain parser, and to parse transaction metadata, we need a Debezium parser.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it is system metadata I believe [1973-03-03 09:46:40, 5138-11-16 09:46:40) is enough. Additionally, the structure of the metadata may be static rather than dynamic, so maybe we can derive(serde::Deserialize) rather than accessing a json Value. But this can be a separate issue.

@@ -607,6 +607,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StrikeW Is it possible timestamptz here is always one of micro/milli/utc_string? Being specific can read better than the legacy default guess_number_unit, but if you are unsure now, at least the current implementation is semantically equivalent as previous.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible. But we need to implement a CustomConverter to enforce that. The default behavior is up to the precision of upstream data types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So this code path would remain limited by [1973-03-03 09:46:40, 5138-11-16 09:46:40).

@tabVersion
Copy link
Contributor

The approach is LGTM since the guessing one can be wrong sometimes. Just wonder what we do if the schema contains multiple timestamptz fields and they have different units? 😇

@xiangjinwu
Copy link
Contributor Author

xiangjinwu commented Apr 12, 2024

The approach is LGTM since the guessing one can be wrong sometimes. Just wonder what we do if the schema contains multiple timestamptz fields and they have different units? 😇

When there are multiple timestamptz fields with different units, a per-column schema provided by kafka connect converters (json, json schema, avro, protobuf) would be necessary. The latter 3 options requires usage of schema registry, while the first json option would embed schema into every message, which is costly and could be disabled with schemas.enable=false. So the user has the following choice:

  • Use avro/protobuf/json schema with schema registry
  • Use json with schemas.enable=true, without schema registry but every message would be large
  • Use json with schemas.enable=false but all timestamptz fields are required to be the same units

See #16097 (comment) for a concrete example of json when schemas.enable is not set to false by user.

This PR supports the 3rd use case and we would support the former 2 kafka connect schema when needed.

e2e_test/source/basic/handling_mode.slt Show resolved Hide resolved
@@ -66,7 +67,7 @@ impl PlainParser {
};

let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
DebeziumJsonAccessBuilder::new()?,
DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared cdc source use the plain parser, and to parse transaction metadata, we need a Debezium parser.

@@ -607,6 +607,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
timestamptz_handling: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible. But we need to implement a CustomConverter to enforce that. The default behavior is up to the precision of upstream data types.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, do we have plan on publishing other params in JsonParseOptions

@xiangjinwu
Copy link
Contributor Author

LGTM, do we have plan on publishing other params in JsonParseOptions

Yes. And unifying them with sink ones (similar yet with minor differences). But it does seem timestamptz control is the most popular one.

@xiangjinwu xiangjinwu added this pull request to the merge queue Apr 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants