Skip to content

Commit

Permalink
feat(sink): support encode jsonb data as dynamic json type in sink (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Aug 2, 2024
1 parent d4b421a commit 35488ce
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 15 deletions.
44 changes: 38 additions & 6 deletions src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use serde_json::{json, Map, Value};
use thiserror_ext::AsReport;

use super::{
CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result,
RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
CustomJsonType, DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, KafkaConnectParamsRef,
Result, RowEncoder, SerTo, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
};
use crate::sink::SinkError;

Expand All @@ -41,6 +41,7 @@ pub struct JsonEncoderConfig {
timestamp_handling_mode: TimestampHandlingMode,
timestamptz_handling_mode: TimestamptzHandlingMode,
custom_json_type: CustomJsonType,
jsonb_handling_mode: JsonbHandlingMode,
}

pub struct JsonEncoder {
Expand All @@ -58,13 +59,15 @@ impl JsonEncoder {
timestamp_handling_mode: TimestampHandlingMode,
timestamptz_handling_mode: TimestamptzHandlingMode,
time_handling_mode: TimeHandlingMode,
jsonb_handling_mode: JsonbHandlingMode,
) -> Self {
let config = JsonEncoderConfig {
time_handling_mode,
date_handling_mode,
timestamp_handling_mode,
timestamptz_handling_mode,
custom_json_type: CustomJsonType::None,
jsonb_handling_mode,
};
Self {
schema,
Expand All @@ -81,6 +84,7 @@ impl JsonEncoder {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::Es,
jsonb_handling_mode: JsonbHandlingMode::Dynamic,
};
Self {
schema,
Expand All @@ -101,6 +105,7 @@ impl JsonEncoder {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::Doris(map),
jsonb_handling_mode: JsonbHandlingMode::String,
};
Self {
schema,
Expand All @@ -117,6 +122,7 @@ impl JsonEncoder {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::StarRocks,
jsonb_handling_mode: JsonbHandlingMode::Dynamic,
};
Self {
schema,
Expand Down Expand Up @@ -289,11 +295,12 @@ fn datum_to_json_object(
(DataType::Interval, ScalarRefImpl::Interval(v)) => {
json!(v.as_iso_8601())
}
(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match &config.custom_json_type {
CustomJsonType::Es | CustomJsonType::StarRocks => JsonbVal::from(jsonb_ref).take(),
CustomJsonType::Doris(_) | CustomJsonType::None => {

(DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match config.jsonb_handling_mode {
JsonbHandlingMode::String => {
json!(jsonb_ref.to_string())
}
JsonbHandlingMode::Dynamic => JsonbVal::from(jsonb_ref).take(),
},
(DataType::List(datatype), ScalarRefImpl::List(list_ref)) => {
let elems = list_ref.iter();
Expand Down Expand Up @@ -446,6 +453,7 @@ mod tests {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::None,
jsonb_handling_mode: JsonbHandlingMode::String,
};

let boolean_value = datum_to_json_object(
Expand Down Expand Up @@ -517,6 +525,7 @@ mod tests {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix,
custom_json_type: CustomJsonType::None,
jsonb_handling_mode: JsonbHandlingMode::String,
};

let tstz_inner = "2018-01-26T18:30:09.453Z".parse().unwrap();
Expand All @@ -537,6 +546,7 @@ mod tests {
timestamp_handling_mode: TimestampHandlingMode::Milli,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::None,
jsonb_handling_mode: JsonbHandlingMode::String,
};
let ts_value = datum_to_json_object(
&Field {
Expand Down Expand Up @@ -603,6 +613,7 @@ mod tests {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::Doris(map),
jsonb_handling_mode: JsonbHandlingMode::String,
};
let decimal = datum_to_json_object(
&Field {
Expand All @@ -628,11 +639,12 @@ mod tests {
assert_eq!(date_value, json!(719163));

let from_epoch_config = JsonEncoderConfig {
time_handling_mode: TimeHandlingMode::Milli,
time_handling_mode: TimeHandlingMode::String,
date_handling_mode: DateHandlingMode::FromEpoch,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::None,
jsonb_handling_mode: JsonbHandlingMode::String,
};
let date_value = datum_to_json_object(
&Field {
Expand All @@ -651,6 +663,7 @@ mod tests {
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::Doris(HashMap::default()),
jsonb_handling_mode: JsonbHandlingMode::String,
};
let date_value = datum_to_json_object(
&Field {
Expand Down Expand Up @@ -683,6 +696,25 @@ mod tests {
)
.unwrap();
assert_eq!(interval_value, json!("{\"v3\":3,\"v2\":2,\"v1\":1}"));

let encode_jsonb_obj_config = JsonEncoderConfig {
time_handling_mode: TimeHandlingMode::String,
date_handling_mode: DateHandlingMode::String,
timestamp_handling_mode: TimestampHandlingMode::String,
timestamptz_handling_mode: TimestamptzHandlingMode::UtcString,
custom_json_type: CustomJsonType::None,
jsonb_handling_mode: JsonbHandlingMode::Dynamic,
};
let json_value = datum_to_json_object(
&Field {
data_type: DataType::Jsonb,
..mock_field.clone()
},
Some(ScalarImpl::Jsonb(JsonbVal::from(json!([1, 2, 3]))).as_scalar_ref_impl()),
&encode_jsonb_obj_config,
)
.unwrap();
assert_eq!(json_value, json!([1, 2, 3]));
}

#[test]
Expand Down
25 changes: 25 additions & 0 deletions src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,31 @@ pub enum CustomJsonType {
None,
}

/// How the jsonb type is encoded.
///
/// - `String`: encode jsonb as string. `[1, true, "foo"] -> "[1, true, \"foo\"]"`
/// - `Dynamic`: encode jsonb as json type dynamically. `[1, true, "foo"] -> [1, true, "foo"]`
pub enum JsonbHandlingMode {
String,
Dynamic,
}

impl JsonbHandlingMode {
pub const OPTION_KEY: &'static str = "jsonb.handling.mode";

pub fn from_options(options: &BTreeMap<String, String>) -> Result<Self> {
match options.get(Self::OPTION_KEY).map(std::ops::Deref::deref) {
Some("string") | None => Ok(Self::String),
Some("dynamic") => Ok(Self::Dynamic),
Some(v) => Err(super::SinkError::Config(anyhow::anyhow!(
"unrecognized {} value {}",
Self::OPTION_KEY,
v
))),
}
}
}

#[derive(Debug)]
struct FieldEncodeError {
message: String,
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/sink/formatter/debezium_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use tracing::warn;

use super::{Result, SinkFormatter, StreamChunk};
use crate::sink::encoder::{
DateHandlingMode, JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
DateHandlingMode, JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode,
TimestampHandlingMode, TimestamptzHandlingMode,
};
use crate::tri;

Expand Down Expand Up @@ -69,6 +69,7 @@ impl DebeziumJsonFormatter {
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
JsonbHandlingMode::String,
);
let val_encoder = JsonEncoder::new(
schema.clone(),
Expand All @@ -77,6 +78,7 @@ impl DebeziumJsonFormatter {
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
JsonbHandlingMode::String,
);
Self {
schema,
Expand Down Expand Up @@ -397,6 +399,7 @@ mod tests {
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
JsonbHandlingMode::String,
);
let json_chunk = chunk_to_json(chunk, &encoder).unwrap();
let schema_json = schema_to_json(&schema, "test_db", "test_table");
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::text::TextEncoder;
use super::encoder::{
DateHandlingMode, KafkaConnectParams, TimeHandlingMode, TimestamptzHandlingMode,
DateHandlingMode, JsonbHandlingMode, KafkaConnectParams, TimeHandlingMode,
TimestamptzHandlingMode,
};
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
use crate::sink::encoder::{
Expand Down Expand Up @@ -113,13 +114,15 @@ pub trait EncoderBuild: Sized {
impl EncoderBuild for JsonEncoder {
async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
let jsonb_handling_mode = JsonbHandlingMode::from_options(&b.format_desc.options)?;
let encoder = JsonEncoder::new(
b.schema,
pk_indices,
DateHandlingMode::FromCe,
TimestampHandlingMode::Milli,
timestamptz_mode,
TimeHandlingMode::Milli,
jsonb_handling_mode,
);
let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ mod test {

use super::*;
use crate::sink::encoder::{
DateHandlingMode, JsonEncoder, TimeHandlingMode, TimestampHandlingMode,
DateHandlingMode, JsonEncoder, JsonbHandlingMode, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use crate::sink::formatter::AppendOnlyFormatter;
Expand Down Expand Up @@ -778,6 +778,7 @@ mod test {
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::Milli,
JsonbHandlingMode::String,
),
)),
)
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/sink/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use with_options::WithOptions;

use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::{
DateHandlingMode, JsonEncoder, ProtoEncoder, ProtoHeader, RowEncoder, SerTo, TimeHandlingMode,
TimestampHandlingMode, TimestamptzHandlingMode,
DateHandlingMode, JsonEncoder, JsonbHandlingMode, ProtoEncoder, ProtoHeader, RowEncoder, SerTo,
TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
};
use super::writer::AsyncTruncateSinkWriterExt;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
Expand Down Expand Up @@ -221,7 +221,7 @@ impl MqttSinkWriter {
}

let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?;

let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
let encoder = match format_desc.format {
SinkFormat::AppendOnly => match format_desc.encode {
SinkEncode::Json => RowEncoderWrapper::Json(JsonEncoder::new(
Expand All @@ -231,6 +231,7 @@ impl MqttSinkWriter {
TimestampHandlingMode::Milli,
timestamptz_mode,
TimeHandlingMode::Milli,
jsonb_handling_mode,
)),
SinkEncode::Protobuf => {
let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor(
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use with_options::WithOptions;

use super::encoder::{DateHandlingMode, TimeHandlingMode, TimestamptzHandlingMode};
use super::encoder::{
DateHandlingMode, JsonbHandlingMode, TimeHandlingMode, TimestamptzHandlingMode,
};
use super::utils::chunk_to_json;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::connector_common::NatsCommon;
Expand Down Expand Up @@ -145,6 +147,7 @@ impl NatsSinkWriter {
TimestampHandlingMode::Milli,
TimestamptzHandlingMode::UtcWithoutSuffix,
TimeHandlingMode::Milli,
JsonbHandlingMode::String,
),
})
}
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use uuid::Uuid;
use with_options::WithOptions;

use super::encoder::{
JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
TimestamptzHandlingMode,
};
use super::snowflake_connector::{generate_s3_file_name, SnowflakeHttpClient, SnowflakeS3Client};
use super::writer::LogSinkerOf;
Expand Down Expand Up @@ -231,6 +232,7 @@ impl SnowflakeSinkWriter {
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::String,
JsonbHandlingMode::String,
),
// initial value of `epoch` will be set to 0
epoch: 0,
Expand Down

0 comments on commit 35488ce

Please sign in to comment.