From 82c03f9a332548fe88cddbbb3966db0d564363d4 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 4 Mar 2024 17:54:38 -0500 Subject: [PATCH 01/38] basic structure for snowflake sink --- src/connector/src/sink/mod.rs | 4 + src/connector/src/sink/snowflake.rs | 145 ++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 src/connector/src/sink/snowflake.rs diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 6abe8d93b595..5446f2ea8b51 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -33,6 +33,7 @@ pub mod pulsar; pub mod redis; pub mod remote; pub mod starrocks; +pub mod snowflake; pub mod test_sink; pub mod trivial; pub mod utils; @@ -88,6 +89,7 @@ macro_rules! for_all_sinks { { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, + { Snowflake, $crate::sink::snowflake::SnowflakeSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, @@ -515,6 +517,8 @@ pub enum SinkError { ), #[error("Starrocks error: {0}")] Starrocks(String), + #[error("Snowflake error: {0}")] + Snowflake(String), #[error("Pulsar error: {0}")] Pulsar( #[source] diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs new file mode 100644 index 000000000000..a6b53288c5f6 --- /dev/null +++ b/src/connector/src/sink/snowflake.rs @@ -0,0 +1,145 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, sync::Arc}; + +use anyhow::anyhow; +use serde::Deserialize; +use serde_derive::Serialize; +use serde_json::Value; +use serde_with::serde_as; +use async_trait::async_trait; +use with_options::WithOptions; +use risingwave_common::{array::StreamChunk, buffer::Bitmap, catalog::Schema}; + +use super::{encoder::JsonEncoder, writer::LogSinkerOf, SinkError, SinkParam}; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; + +pub const SNOWFLAKE_SINK: &str = "snowflake"; + +// TODO: add comments +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct SnowflakeCommon { + #[serde(rename = "snowflake.url")] + pub url: String, + + #[serde(rename = "snowflake.database")] + pub database: String, + + #[serde(rename = "snowflake.user")] + pub user: String, + + #[serde(rename = "snowflake.private.key")] + pub private_key: String, + + #[serde(rename = "snowflake.private.key.passphrase")] + pub private_key_passphrase: Option, + + #[serde(rename = "snowflake.role")] + pub role: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct SnowflakeConfig { + #[serde(flatten)] + pub common: SnowflakeCommon, +} + +impl SnowflakeConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + Ok(config) + } +} + +#[derive(Debug)] +pub struct SnowflakeSink { + pub config: SnowflakeConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl Sink for SnowflakeSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = SNOWFLAKE_SINK; + + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + todo!() + } + + async fn validate(&self) -> Result<()> { + todo!() + } +} + +pub struct SnowflakeSinkWriter { + pub config: SnowflakeSink, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + client: Option, + row_encoder: JsonEncoder, +} + +impl TryFrom for SnowflakeSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = SnowflakeConfig::from_hashmap(param.properties)?; + Ok(SnowflakeSink { + config, + schema, + pk_indices: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), + }) + } +} + +#[async_trait] +impl SinkWriter for SnowflakeSinkWriter { + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + todo!() + } + + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + todo!() + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result { + todo!() + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} + +pub struct SnowflakeClient { + +} + +impl SnowflakeClient { + +} \ No newline at end of file From 03c5aa072d6397096610146110b1d3bba58c4c0c Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 5 Mar 2024 20:03:40 -0500 Subject: [PATCH 02/38] fix format --- src/connector/src/sink/mod.rs | 2 +- src/connector/src/sink/snowflake.rs | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 5446f2ea8b51..c42457632d4c 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -32,8 +32,8 @@ pub mod nats; pub mod pulsar; pub mod redis; pub mod remote; -pub mod starrocks; pub mod snowflake; +pub mod starrocks; pub mod test_sink; pub mod trivial; pub mod utils; diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index a6b53288c5f6..a72dcb26a953 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -12,18 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; +use std::sync::Arc; use anyhow::anyhow; +use async_trait::async_trait; +use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; -use async_trait::async_trait; use with_options::WithOptions; -use risingwave_common::{array::StreamChunk, buffer::Bitmap, catalog::Schema}; -use super::{encoder::JsonEncoder, writer::LogSinkerOf, SinkError, SinkParam}; +use super::encoder::JsonEncoder; +use super::writer::LogSinkerOf; +use super::{SinkError, SinkParam}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; @@ -136,10 +141,6 @@ impl SinkWriter for SnowflakeSinkWriter { } } -pub struct SnowflakeClient { - -} - -impl SnowflakeClient { +pub struct SnowflakeClient {} -} \ No newline at end of file +impl SnowflakeClient {} From e9da4667caf89c366481949180f0b739dab12251 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 17:48:08 -0500 Subject: [PATCH 03/38] update snowflake common --- src/connector/src/sink/snowflake.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index a72dcb26a953..0cb86176fc38 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -36,14 +36,17 @@ pub const SNOWFLAKE_SINK: &str = "snowflake"; // TODO: add comments #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct SnowflakeCommon { - #[serde(rename = "snowflake.url")] - pub url: String, - #[serde(rename = "snowflake.database")] pub database: String, - #[serde(rename = "snowflake.user")] - pub user: String, + #[serde(rename = "snowflake.database.schema")] + pub schema: String, + + #[serde(rename = "snowflake.database.schema.pipe")] + pub pipe: String, + + #[serde(rename = "snowflake.account")] + pub account: String, #[serde(rename = "snowflake.private.key")] pub private_key: String, @@ -53,6 +56,12 @@ pub struct SnowflakeCommon { #[serde(rename = "snowflake.role")] pub role: String, + + #[serde(rename = "snowflake.jwt_token")] + pub jwt_token: String, + + #[serde(rename = "snowflake.s3")] + pub s3: String, } #[serde_as] From 805b6a23bb37af1513257f4431ac198a6c4e61ed Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 18:58:15 -0500 Subject: [PATCH 04/38] add snowflake_connector.rs --- src/connector/src/sink/mod.rs | 1 + src/connector/src/sink/snowflake_connector.rs | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 src/connector/src/sink/snowflake_connector.rs diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index c42457632d4c..ec18d38c7ea5 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -33,6 +33,7 @@ pub mod pulsar; pub mod redis; pub mod remote; pub mod snowflake; +pub mod snowflake_connector; pub mod starrocks; pub mod test_sink; pub mod trivial; diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs new file mode 100644 index 000000000000..e47a782e46e6 --- /dev/null +++ b/src/connector/src/sink/snowflake_connector.rs @@ -0,0 +1,51 @@ +use http::request::Builder; +use hyper::body::{Body, Sender}; +use hyper::client::HttpConnector; +use hyper::{body, Client, Request, StatusCode}; +use hyper_tls::HttpsConnector; + +use std::collections::HashMap; + +use super::{Result, SinkError}; + +const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; +const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; + +#[derive(Debug)] +pub struct SnowflakeInserterBuilder { + url: String, + header: HashMap, +} + +impl SnowflakeInserterBuilder { + pub fn new(account: String, db: String, schema: String, pipe: String, header: HashMap) -> Self { + // TODO: ensure if we need user to *explicitly* provide the request id + let url = format!("https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}", + account, + SNOWFLAKE_HOST_ADDR, + db, + schema. + pipe, + SNOWFLAKE_REQUEST_ID); + + Self { + url, + header, + } + } + + fn build_request_and_client() -> (Builder, Client>) { + + } + + pub async fn build(&self) -> Result { + + } +} + +#[derive(Debug)] +pub struct SnowflakeInserter { + sender: Option, + join_handle: Option>, + buffer: BytesMut, +} \ No newline at end of file From f0657e26f0926b733b7ff92e945697413811dfeb Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 19:09:09 -0500 Subject: [PATCH 05/38] add snowflake inserter (and builder) --- src/connector/src/sink/snowflake_connector.rs | 51 ++++++++++++------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index e47a782e46e6..83eb260461e6 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -1,11 +1,14 @@ +use std::collections::HashMap; + +use bytes::{BufMut, Bytes, BytesMut}; use http::request::Builder; use hyper::body::{Body, Sender}; use hyper::client::HttpConnector; use hyper::{body, Client, Request, StatusCode}; use hyper_tls::HttpsConnector; +use tokio::task::JoinHandle; -use std::collections::HashMap; - +use super::doris_starrocks_connector::POOL_IDLE_TIMEOUT; use super::{Result, SinkError}; const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; @@ -18,34 +21,44 @@ pub struct SnowflakeInserterBuilder { } impl SnowflakeInserterBuilder { - pub fn new(account: String, db: String, schema: String, pipe: String, header: HashMap) -> Self { + pub fn new( + account: String, + db: String, + schema: String, + pipe: String, + header: HashMap, + ) -> Self { // TODO: ensure if we need user to *explicitly* provide the request id - let url = format!("https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}", - account, - SNOWFLAKE_HOST_ADDR, - db, - schema. - pipe, - SNOWFLAKE_REQUEST_ID); - - Self { - url, - header, - } + let url = format!( + "https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}", + account, SNOWFLAKE_HOST_ADDR, db, schema, pipe, SNOWFLAKE_REQUEST_ID + ); + + Self { url, header } } - fn build_request_and_client() -> (Builder, Client>) { + fn build_request_and_client(&self) -> (Builder, Client>) { + let mut builder = Request::put(self.url.clone()); + for (k, v) in &self.header { + builder = builder.header(k, v); + } + + let connector = HttpsConnector::new(); + let client = Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build(connector); + (builder, client) } pub async fn build(&self) -> Result { - + Err(SinkError::Snowflake("err!".to_string())) } } #[derive(Debug)] pub struct SnowflakeInserter { sender: Option, - join_handle: Option>, + join_handle: Option>>>, buffer: BytesMut, -} \ No newline at end of file +} From b6bdd340be14e5838cad2a20aeea58ef13638bb2 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 19:11:46 -0500 Subject: [PATCH 06/38] update license --- src/connector/src/sink/snowflake_connector.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 83eb260461e6..996ce47cedaf 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -1,3 +1,17 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::collections::HashMap; use bytes::{BufMut, Bytes, BytesMut}; @@ -52,6 +66,9 @@ impl SnowflakeInserterBuilder { } pub async fn build(&self) -> Result { + let (builder, client) = self.build_request_and_client(); + let (sender, body) = Body::channel(); + Err(SinkError::Snowflake("err!".to_string())) } } From 827a1d9fcaae8868c86074e0d29d870e81b0111f Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 19:50:34 -0500 Subject: [PATCH 07/38] add snowflake http client --- src/connector/src/sink/snowflake_connector.rs | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 996ce47cedaf..92836f5fa849 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -29,17 +29,19 @@ const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; #[derive(Debug)] -pub struct SnowflakeInserterBuilder { +pub struct SnowflakeHttpClient { url: String, + s3: String, header: HashMap, } -impl SnowflakeInserterBuilder { +impl SnowflakeHttpClient { pub fn new( account: String, db: String, schema: String, pipe: String, + s3: String, header: HashMap, ) -> Self { // TODO: ensure if we need user to *explicitly* provide the request id @@ -48,11 +50,11 @@ impl SnowflakeInserterBuilder { account, SNOWFLAKE_HOST_ADDR, db, schema, pipe, SNOWFLAKE_REQUEST_ID ); - Self { url, header } + Self { url, s3, header } } fn build_request_and_client(&self) -> (Builder, Client>) { - let mut builder = Request::put(self.url.clone()); + let mut builder = Request::post(self.url.clone()); for (k, v) in &self.header { builder = builder.header(k, v); } @@ -65,17 +67,17 @@ impl SnowflakeInserterBuilder { (builder, client) } - pub async fn build(&self) -> Result { + /// NOTE: this function should ONLY be called after + /// uploading files to remote external staged storage, e.g., AWS S3 + pub async fn send_request(&self) -> Result<()> { let (builder, client) = self.build_request_and_client(); - let (sender, body) = Body::channel(); - - Err(SinkError::Snowflake("err!".to_string())) + let request = builder + .body(Body::from(self.s3.clone())) + .map_err(|err| SinkError::Snowflake(err.to_string()))?; + let response = client.request(request).await.map_err(|err| SinkError::Snowflake(err.to_string()))?; + if response.status() != StatusCode::OK { + return Err(SinkError::Snowflake(format!("failed to make http request, error code: {}", response.status()))); + } + Ok(()) } } - -#[derive(Debug)] -pub struct SnowflakeInserter { - sender: Option, - join_handle: Option>>>, - buffer: BytesMut, -} From e19c3ea49ac21608b78f8d9f1dbd1024dc832fa9 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 19:50:55 -0500 Subject: [PATCH 08/38] update fmt --- src/connector/src/sink/snowflake_connector.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 92836f5fa849..15b56e10fe10 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -74,9 +74,15 @@ impl SnowflakeHttpClient { let request = builder .body(Body::from(self.s3.clone())) .map_err(|err| SinkError::Snowflake(err.to_string()))?; - let response = client.request(request).await.map_err(|err| SinkError::Snowflake(err.to_string()))?; + let response = client + .request(request) + .await + .map_err(|err| SinkError::Snowflake(err.to_string()))?; if response.status() != StatusCode::OK { - return Err(SinkError::Snowflake(format!("failed to make http request, error code: {}", response.status()))); + return Err(SinkError::Snowflake(format!( + "failed to make http request, error code: {}", + response.status() + ))); } Ok(()) } From 53ef2c5717ff8d2958fc43e8d61e76c0f5da6ff4 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 7 Mar 2024 19:51:21 -0500 Subject: [PATCH 09/38] remove redundant import --- src/connector/src/sink/snowflake_connector.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 15b56e10fe10..a09ad2359f7e 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -14,13 +14,11 @@ use std::collections::HashMap; -use bytes::{BufMut, Bytes, BytesMut}; use http::request::Builder; -use hyper::body::{Body, Sender}; +use hyper::body::Body; use hyper::client::HttpConnector; -use hyper::{body, Client, Request, StatusCode}; +use hyper::{Client, Request, StatusCode}; use hyper_tls::HttpsConnector; -use tokio::task::JoinHandle; use super::doris_starrocks_connector::POOL_IDLE_TIMEOUT; use super::{Result, SinkError}; From 25d3aef8547309e9d221068d3d928e1c3a7f485c Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Fri, 8 Mar 2024 20:04:34 -0500 Subject: [PATCH 10/38] add jwt_token auto-generation --- Cargo.lock | 1 + src/connector/Cargo.toml | 1 + src/connector/src/sink/snowflake.rs | 18 ++- src/connector/src/sink/snowflake_connector.rs | 106 +++++++++++++++++- 4 files changed, 114 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d83a977aeb1e..25ca482a68c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9134,6 +9134,7 @@ dependencies = [ "itertools 0.12.0", "jni", "jsonschema-transpiler", + "jsonwebtoken 9.2.0", "madsim-rdkafka", "madsim-tokio", "madsim-tonic", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 1efdce6a2d87..d8f202ba3a9a 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -71,6 +71,7 @@ icelake = { workspace = true } indexmap = { version = "1.9.3", features = ["serde"] } itertools = "0.12" jni = { version = "0.21.1", features = ["invocation"] } +jsonwebtoken = "9.2.0" jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" } maplit = "1.0.2" moka = { version = "0.12", features = ["future"] } diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 0cb86176fc38..19bc9ddf689d 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -45,8 +45,14 @@ pub struct SnowflakeCommon { #[serde(rename = "snowflake.database.schema.pipe")] pub pipe: String, - #[serde(rename = "snowflake.account")] - pub account: String, + #[serde(rename = "snowflake.account_identifier")] + pub account_identifier: String, + + #[serde(rename = "snowflake.user")] + pub user: String, + + #[serde(rename = "snowflake.rsa_public_key_fp")] + pub rsa_public_key_fp: String, #[serde(rename = "snowflake.private.key")] pub private_key: String, @@ -57,11 +63,11 @@ pub struct SnowflakeCommon { #[serde(rename = "snowflake.role")] pub role: String, - #[serde(rename = "snowflake.jwt_token")] - pub jwt_token: String, + #[serde(rename = "snowflake.s3_bucket")] + pub s3_bucket: String, - #[serde(rename = "snowflake.s3")] - pub s3: String, + #[serde(rename = "snowflake.s3_file")] + pub s3_file: Option, } #[serde_as] diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index a09ad2359f7e..71b64aa7971a 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -13,12 +13,17 @@ // limitations under the License. use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; +use aws_config; +use aws_sdk_s3::{Client as S3Client, Error as S3Error}; use http::request::Builder; use hyper::body::Body; use hyper::client::HttpConnector; use hyper::{Client, Request, StatusCode}; use hyper_tls::HttpsConnector; +use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; +use serde::{Deserialize, Serialize}; use super::doris_starrocks_connector::POOL_IDLE_TIMEOUT; use super::{Result, SinkError}; @@ -26,29 +31,109 @@ use super::{Result, SinkError}; const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; +/// Claims is used when constructing `jwt_token` +/// with payload specified. +/// reference: https://docs.snowflake.com/en/developer-guide/sql-api/authenticating +#[derive(Debug, Serialize, Deserialize)] +struct Claims { + iss: String, + sub: String, + iat: usize, + exp: usize, +} + #[derive(Debug)] pub struct SnowflakeHttpClient { url: String, - s3: String, + rsa_public_key_fp: String, + s3_bucket: String, + s3_file: String, + account: String, + user: String, + private_key: String, header: HashMap, } impl SnowflakeHttpClient { pub fn new( account: String, + user: String, db: String, schema: String, pipe: String, - s3: String, + s3_bucket: String, + s3_file: String, + rsa_public_key_fp: String, + private_key: String, header: HashMap, ) -> Self { - // TODO: ensure if we need user to *explicitly* provide the request id + // TODO: ensure if we need user to *explicitly* provide the `request_id` let url = format!( "https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}", - account, SNOWFLAKE_HOST_ADDR, db, schema, pipe, SNOWFLAKE_REQUEST_ID + account.clone(), + SNOWFLAKE_HOST_ADDR, + db, + schema, + pipe, + SNOWFLAKE_REQUEST_ID ); - Self { url, s3, header } + Self { + url, + rsa_public_key_fp, + s3_bucket, + s3_file, + account, + user, + private_key, + header, + } + } + + /// Generate a 59-minutes valid `jwt_token` for authentication of snowflake side + /// And please note that we will NOT strictly counting the time interval + /// of `jwt_token` expiration. + /// Which essentially means that this method should be called *every time* we want + /// to send `insertFiles` request to snowflake server + fn generate_jwt_token(&self) -> Result { + let header = Header::new(Algorithm::RS256); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as usize; + let lifetime = 59 * 60; + + // Ensure the account and username are uppercase + let account = self.account.to_uppercase(); + let user = self.user.to_uppercase(); + + // Construct the fully qualified username + let qualified_username = format!("{}.{}", account, user); + + let claims = Claims { + iss: format!("{}.{}", qualified_username.clone(), self.rsa_public_key_fp), + sub: qualified_username, + iat: now, + exp: now + lifetime, + }; + + let jwt_token = encode( + &header, + &claims, + &EncodingKey::from_rsa_pem(self.private_key.as_ref()).map_err(|err| { + SinkError::Snowflake(format!( + "failed to encode from provided rsa pem key, error: {}", + err.to_string() + )) + })?, + ) + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to encode jwt_token, error: {}", + err.to_string() + )) + })?; + Ok(jwt_token) } fn build_request_and_client(&self) -> (Builder, Client>) { @@ -69,13 +154,22 @@ impl SnowflakeHttpClient { /// uploading files to remote external staged storage, e.g., AWS S3 pub async fn send_request(&self) -> Result<()> { let (builder, client) = self.build_request_and_client(); + + // Generate the jwt_token + let jwt_token = self.generate_jwt_token()?; + builder + .header("Authorization", format!("Bearer {}", jwt_token)) + .header("X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT"); + let request = builder - .body(Body::from(self.s3.clone())) + .body(Body::from(self.s3_file.clone())) .map_err(|err| SinkError::Snowflake(err.to_string()))?; + let response = client .request(request) .await .map_err(|err| SinkError::Snowflake(err.to_string()))?; + if response.status() != StatusCode::OK { return Err(SinkError::Snowflake(format!( "failed to make http request, error code: {}", From fb0cade41ba9762217958fc1ac27ab9fb9631186 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Mar 2024 16:51:54 -0400 Subject: [PATCH 11/38] add SnowflakeS3Client --- src/connector/src/sink/snowflake.rs | 12 +++---- src/connector/src/sink/snowflake_connector.rs | 32 +++++++++++++------ 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 19bc9ddf689d..32ea0c282f98 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -27,6 +27,7 @@ use serde_with::serde_as; use with_options::WithOptions; use super::encoder::JsonEncoder; +use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; @@ -105,7 +106,7 @@ impl Sink for SnowflakeSink { } async fn validate(&self) -> Result<()> { - todo!() + Ok(()) } } @@ -114,7 +115,10 @@ pub struct SnowflakeSinkWriter { schema: Schema, pk_indices: Vec, is_append_only: bool, - client: Option, + /// the client used to send `insertFiles` post request + http_client: SnowflakeHttpClient, + /// the client to insert file to external storage (i.e., s3) + s3_client: SnowflakeS3Client, row_encoder: JsonEncoder, } @@ -155,7 +159,3 @@ impl SinkWriter for SnowflakeSinkWriter { Ok(()) } } - -pub struct SnowflakeClient {} - -impl SnowflakeClient {} diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 71b64aa7971a..7c26bd42c82a 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -46,8 +46,6 @@ struct Claims { pub struct SnowflakeHttpClient { url: String, rsa_public_key_fp: String, - s3_bucket: String, - s3_file: String, account: String, user: String, private_key: String, @@ -61,8 +59,6 @@ impl SnowflakeHttpClient { db: String, schema: String, pipe: String, - s3_bucket: String, - s3_file: String, rsa_public_key_fp: String, private_key: String, header: HashMap, @@ -81,8 +77,6 @@ impl SnowflakeHttpClient { Self { url, rsa_public_key_fp, - s3_bucket, - s3_file, account, user, private_key, @@ -150,18 +144,19 @@ impl SnowflakeHttpClient { (builder, client) } - /// NOTE: this function should ONLY be called after - /// uploading files to remote external staged storage, e.g., AWS S3 + /// NOTE: this function should ONLY be called *after* + /// uploading files to remote external staged storage, i.e., AWS S3 pub async fn send_request(&self) -> Result<()> { let (builder, client) = self.build_request_and_client(); // Generate the jwt_token let jwt_token = self.generate_jwt_token()?; - builder + let builder = builder .header("Authorization", format!("Bearer {}", jwt_token)) .header("X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT"); let request = builder + // TODO: ensure this .body(Body::from(self.s3_file.clone())) .map_err(|err| SinkError::Snowflake(err.to_string()))?; @@ -179,3 +174,22 @@ impl SnowflakeHttpClient { Ok(()) } } + +/// TODO(Zihao): refactor this part after s3 sink is available +pub struct SnowflakeS3Client { + s3_bucket: String, + s3_file: String, +} + +impl SnowflakeS3Client { + pub fn new(s3_bucket: String, s3_file: String) -> Self { + Self { + s3_bucket, + s3_file, + } + } + + pub fn sink_to_s3() -> Result<()> { + todo!() + } +} From cd4168cb9e0214b5f0e38a3339ad797be7e9c318 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Mar 2024 17:05:05 -0400 Subject: [PATCH 12/38] update SnowflakeSinkWriter --- src/connector/src/sink/snowflake.rs | 34 +++++++++++++++++++---------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 32ea0c282f98..608b4eed3b3c 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -110,18 +110,6 @@ impl Sink for SnowflakeSink { } } -pub struct SnowflakeSinkWriter { - pub config: SnowflakeSink, - schema: Schema, - pk_indices: Vec, - is_append_only: bool, - /// the client used to send `insertFiles` post request - http_client: SnowflakeHttpClient, - /// the client to insert file to external storage (i.e., s3) - s3_client: SnowflakeS3Client, - row_encoder: JsonEncoder, -} - impl TryFrom for SnowflakeSink { type Error = SinkError; @@ -137,6 +125,28 @@ impl TryFrom for SnowflakeSink { } } +pub struct SnowflakeSinkWriter { + config: SnowflakeConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + /// the client used to send `insertFiles` post request + http_client: SnowflakeHttpClient, + /// the client to insert file to external storage (i.e., s3) + s3_client: SnowflakeS3Client, +} + +impl SnowflakeSinkWriter { + pub fn new( + config: SnowflakeConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Self { + todo!() + } +} + #[async_trait] impl SinkWriter for SnowflakeSinkWriter { async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { From 7a9fdf944cd8d2a7b4a9813d8071732ef3a0f2cb Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Mar 2024 17:41:11 -0400 Subject: [PATCH 13/38] set three SinkWriter functions to return Ok --- src/connector/src/sink/snowflake.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 608b4eed3b3c..a62c6b083912 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -150,15 +150,7 @@ impl SnowflakeSinkWriter { #[async_trait] impl SinkWriter for SnowflakeSinkWriter { async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { - todo!() - } - - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - todo!() - } - - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - todo!() + Ok(()) } async fn abort(&mut self) -> Result<()> { @@ -168,4 +160,12 @@ impl SinkWriter for SnowflakeSinkWriter { async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { Ok(()) } + + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + todo!() + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result { + todo!() + } } From db090a9704f4d82c6b1c44e257eba0c8477e8d98 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Mar 2024 18:31:14 -0400 Subject: [PATCH 14/38] add log sinker --- src/connector/src/sink/snowflake.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index a62c6b083912..2820a44341ba 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -30,6 +30,7 @@ use super::encoder::JsonEncoder; use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; +use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; @@ -102,7 +103,12 @@ impl Sink for SnowflakeSink { const SINK_NAME: &'static str = SNOWFLAKE_SINK; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - todo!() + Ok(SnowflakeSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ).into_log_sink(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { From 95310bf960e214581750acace7765b4f4660105a Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Mar 2024 22:45:32 -0400 Subject: [PATCH 15/38] basic sink funtionality with json encoder --- src/connector/src/sink/snowflake.rs | 78 ++++++++++++++++--- src/connector/src/sink/snowflake_connector.rs | 51 +++++++++--- 2 files changed, 106 insertions(+), 23 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 2820a44341ba..cf39a082da4a 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -17,16 +17,17 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use serde::Deserialize; -use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; use with_options::WithOptions; -use super::encoder::JsonEncoder; +use super::encoder::{ + JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, +}; use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; @@ -34,6 +35,7 @@ use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; +const MAX_BATCH_NUM: u32 = 1000000; // TODO: add comments #[derive(Deserialize, Debug, Clone, WithOptions)] @@ -108,7 +110,9 @@ impl Sink for SnowflakeSink { self.schema.clone(), self.pk_indices.clone(), self.is_append_only, - ).into_log_sink(writer_param.sink_metrics)) + ) + .await + .into_log_sinker(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { @@ -140,22 +144,74 @@ pub struct SnowflakeSinkWriter { http_client: SnowflakeHttpClient, /// the client to insert file to external storage (i.e., s3) s3_client: SnowflakeS3Client, + row_encoder: JsonEncoder, + counter: u32, + payload: String, } impl SnowflakeSinkWriter { - pub fn new( + pub async fn new( config: SnowflakeConfig, schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Self { - todo!() + let http_client = SnowflakeHttpClient::new( + config.common.account_identifier.clone(), + config.common.user.clone(), + config.common.database.clone(), + config.common.schema.clone(), + config.common.pipe.clone(), + config.common.rsa_public_key_fp.clone(), + config.common.private_key.clone(), + HashMap::new(), + ); + + let s3_client = SnowflakeS3Client::new(config.common.s3_bucket.clone()).await; + + Self { + config, + schema: schema.clone(), + pk_indices, + is_append_only, + http_client, + s3_client, + row_encoder: JsonEncoder::new( + schema, + None, + super::encoder::DateHandlingMode::String, + TimestampHandlingMode::String, + TimestamptzHandlingMode::UtcString, + TimeHandlingMode::String, + ), + counter: 0, + payload: String::new(), + } + } + + fn reset(&mut self) { + self.payload.clear(); + self.counter = 0; + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); + self.payload.push_str(&row_json_string); + } + self.s3_client.sink_to_s3(self.payload.clone().into()).await?; + self.http_client.send_request().await?; + self.reset(); + Ok(()) } } #[async_trait] impl SinkWriter for SnowflakeSinkWriter { - async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { Ok(()) } @@ -167,11 +223,11 @@ impl SinkWriter for SnowflakeSinkWriter { Ok(()) } - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - todo!() + async fn barrier(&mut self, _is_checkpoint: bool) -> Result { + Ok(()) } - async fn barrier(&mut self, is_checkpoint: bool) -> Result { - todo!() + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + self.append_only(chunk).await } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 7c26bd42c82a..49863d08d9c2 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -16,8 +16,11 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use aws_config; -use aws_sdk_s3::{Client as S3Client, Error as S3Error}; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::Client as S3Client; +use bytes::Bytes; use http::request::Builder; +use http::header; use hyper::body::Body; use hyper::client::HttpConnector; use hyper::{Client, Request, StatusCode}; @@ -30,6 +33,7 @@ use super::{Result, SinkError}; const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; +const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; /// Claims is used when constructing `jwt_token` /// with payload specified. @@ -65,7 +69,7 @@ impl SnowflakeHttpClient { ) -> Self { // TODO: ensure if we need user to *explicitly* provide the `request_id` let url = format!( - "https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}", + "https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?requestId={}", account.clone(), SNOWFLAKE_HOST_ADDR, db, @@ -74,6 +78,8 @@ impl SnowflakeHttpClient { SNOWFLAKE_REQUEST_ID ); + println!("url: {}", url); + Self { url, rsa_public_key_fp, @@ -132,9 +138,6 @@ impl SnowflakeHttpClient { fn build_request_and_client(&self) -> (Builder, Client>) { let mut builder = Request::post(self.url.clone()); - for (k, v) in &self.header { - builder = builder.header(k, v); - } let connector = HttpsConnector::new(); let client = Client::builder() @@ -152,12 +155,16 @@ impl SnowflakeHttpClient { // Generate the jwt_token let jwt_token = self.generate_jwt_token()?; let builder = builder + .header(header::CONTENT_TYPE, "text/plain") .header("Authorization", format!("Bearer {}", jwt_token)) - .header("X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT"); + .header( + "X-Snowflake-Authorization-Token-Type".to_string(), + "KEYPAIR_JWT", + ); let request = builder // TODO: ensure this - .body(Body::from(self.s3_file.clone())) + .body(Body::from(S3_INTERMEDIATE_FILE_NAME)) .map_err(|err| SinkError::Snowflake(err.to_string()))?; let response = client @@ -171,6 +178,9 @@ impl SnowflakeHttpClient { response.status() ))); } + + println!("resp: {:#?}", response); + Ok(()) } } @@ -178,18 +188,35 @@ impl SnowflakeHttpClient { /// TODO(Zihao): refactor this part after s3 sink is available pub struct SnowflakeS3Client { s3_bucket: String, - s3_file: String, + s3_client: S3Client, } impl SnowflakeS3Client { - pub fn new(s3_bucket: String, s3_file: String) -> Self { + pub async fn new(s3_bucket: String) -> Self { + let config = aws_config::load_from_env().await; + let s3_client = S3Client::new(&config); + Self { s3_bucket, - s3_file, + s3_client, } } - pub fn sink_to_s3() -> Result<()> { - todo!() + pub async fn sink_to_s3(&self, data: Bytes) -> Result<()> { + self.s3_client + .put_object() + .bucket(self.s3_bucket.clone()) + .key(S3_INTERMEDIATE_FILE_NAME) + .body(ByteStream::from(data)) + .send() + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to sink data to S3, error: {}", + err.to_string() + )) + })?; + + Ok(()) } } From e46b51c35d7b54dcb02944e7c0b1dd2437b88191 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 14:48:37 -0400 Subject: [PATCH 16/38] add comments && update sink_to_s3 --- src/connector/src/sink/snowflake.rs | 61 ++++++++++++------- src/connector/src/sink/snowflake_connector.rs | 11 ++-- 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index cf39a082da4a..49804259fcfe 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -35,43 +35,46 @@ use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; -const MAX_BATCH_NUM: u32 = 1000000; +const MAX_BATCH_ROW_NUM: u32 = 1000; -// TODO: add comments #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct SnowflakeCommon { + /// The snowflake database used for sinking #[serde(rename = "snowflake.database")] pub database: String, - #[serde(rename = "snowflake.database.schema")] + /// The corresponding schema where sink table exists + #[serde(rename = "snowflake.schema")] pub schema: String, - #[serde(rename = "snowflake.database.schema.pipe")] + /// The created pipe object, will be used as `insertFiles` target + #[serde(rename = "snowflake.pipe")] pub pipe: String, + /// The unique, snowflake provided `account_identifier` + /// NOTE: please use the form `-` + /// For detailed guidance, reference: https://docs.snowflake.com/en/user-guide/admin-account-identifier #[serde(rename = "snowflake.account_identifier")] pub account_identifier: String, + /// The user that owns the table to be sinked + /// NOTE: the user should've been granted corresponding *role* + /// reference: https://docs.snowflake.com/en/sql-reference/sql/grant-role #[serde(rename = "snowflake.user")] pub user: String, + /// The public key fingerprint used when generating custom `jwt_token` + /// reference: https://docs.snowflake.com/en/developer-guide/sql-api/authenticating #[serde(rename = "snowflake.rsa_public_key_fp")] pub rsa_public_key_fp: String, - #[serde(rename = "snowflake.private.key")] + /// The rsa pem key *without* encrption + #[serde(rename = "snowflake.private_key")] pub private_key: String, - #[serde(rename = "snowflake.private.key.passphrase")] - pub private_key_passphrase: Option, - - #[serde(rename = "snowflake.role")] - pub role: String, - + /// The s3 bucket where intermediate sink files will be stored #[serde(rename = "snowflake.s3_bucket")] pub s3_bucket: String, - - #[serde(rename = "snowflake.s3_file")] - pub s3_file: Option, } #[serde_as] @@ -145,8 +148,9 @@ pub struct SnowflakeSinkWriter { /// the client to insert file to external storage (i.e., s3) s3_client: SnowflakeS3Client, row_encoder: JsonEncoder, - counter: u32, + row_counter: u32, payload: String, + sink_file_suffix: u32, } impl SnowflakeSinkWriter { @@ -184,14 +188,24 @@ impl SnowflakeSinkWriter { TimestamptzHandlingMode::UtcString, TimeHandlingMode::String, ), - counter: 0, + row_counter: 0, payload: String::new(), + // Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0` + sink_file_suffix: 0, } } fn reset(&mut self) { self.payload.clear(); - self.counter = 0; + self.row_counter = 0; + // Note that we shall NOT reset the `sink_file_suffix` + // since we need to incrementally keep the sink + // file *unique*, otherwise snowflake will not + // sink it from external stage (i.e., s3) + } + + fn at_sink_threshold(&self) { + self.counter >= MAX_BATCH_ROW_NUM } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { @@ -201,10 +215,8 @@ impl SnowflakeSinkWriter { } let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); self.payload.push_str(&row_json_string); + self.row_counter += 1; } - self.s3_client.sink_to_s3(self.payload.clone().into()).await?; - self.http_client.send_request().await?; - self.reset(); Ok(()) } } @@ -228,6 +240,13 @@ impl SinkWriter for SnowflakeSinkWriter { } async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - self.append_only(chunk).await + self.append_only(chunk).await?; + + if self.at_sink_threshold() { + self.s3_client.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix).await?; + self.http_client.send_request().await?; + self.reset(); + self.sink_file_suffix += 1; + } } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 49863d08d9c2..c86ee6f3a19c 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -174,13 +174,12 @@ impl SnowflakeHttpClient { if response.status() != StatusCode::OK { return Err(SinkError::Snowflake(format!( - "failed to make http request, error code: {}", - response.status() + "failed to make http request, error code: {}\ndetailed response: {:#?}", + response.status(), + response, ))); } - println!("resp: {:#?}", response); - Ok(()) } } @@ -202,11 +201,11 @@ impl SnowflakeS3Client { } } - pub async fn sink_to_s3(&self, data: Bytes) -> Result<()> { + pub async fn sink_to_s3(&self, data: Bytes, file_num: u32) -> Result<()> { self.s3_client .put_object() .bucket(self.s3_bucket.clone()) - .key(S3_INTERMEDIATE_FILE_NAME) + .key(format!("{}{}", S3_INTERMEDIATE_FILE_NAME, file_num)) .body(ByteStream::from(data)) .send() .await From cd6f587444cf781c1366b1b123e5b890d629eb57 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 14:50:34 -0400 Subject: [PATCH 17/38] add file num to send_request --- src/connector/src/sink/snowflake.rs | 2 +- src/connector/src/sink/snowflake_connector.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 49804259fcfe..648d0fad284b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -244,7 +244,7 @@ impl SinkWriter for SnowflakeSinkWriter { if self.at_sink_threshold() { self.s3_client.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix).await?; - self.http_client.send_request().await?; + self.http_client.send_request(self.sink_file_suffix).await?; self.reset(); self.sink_file_suffix += 1; } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index c86ee6f3a19c..003f068a6807 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -149,7 +149,7 @@ impl SnowflakeHttpClient { /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 - pub async fn send_request(&self) -> Result<()> { + pub async fn send_request(&self, file_num: u32) -> Result<()> { let (builder, client) = self.build_request_and_client(); // Generate the jwt_token @@ -164,7 +164,7 @@ impl SnowflakeHttpClient { let request = builder // TODO: ensure this - .body(Body::from(S3_INTERMEDIATE_FILE_NAME)) + .body(Body::from(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_num))) .map_err(|err| SinkError::Snowflake(err.to_string()))?; let response = client @@ -205,7 +205,7 @@ impl SnowflakeS3Client { self.s3_client .put_object() .bucket(self.s3_bucket.clone()) - .key(format!("{}{}", S3_INTERMEDIATE_FILE_NAME, file_num)) + .key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_num)) .body(ByteStream::from(data)) .send() .await From 4caa11fb25815a5da8fdd3150c604070b3aeb35d Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 14:52:20 -0400 Subject: [PATCH 18/38] fix typo --- src/connector/src/sink/snowflake.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 648d0fad284b..694ad30efb5c 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -68,7 +68,7 @@ pub struct SnowflakeCommon { #[serde(rename = "snowflake.rsa_public_key_fp")] pub rsa_public_key_fp: String, - /// The rsa pem key *without* encrption + /// The rsa pem key *without* encryption #[serde(rename = "snowflake.private_key")] pub private_key: String, From 00d548d9f65517127bb98dc73892f5f8525afdef Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 15:02:00 -0400 Subject: [PATCH 19/38] add aws credentials to prevent load_from_env --- src/connector/src/sink/snowflake.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 694ad30efb5c..d11424c8e8e1 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -75,6 +75,18 @@ pub struct SnowflakeCommon { /// The s3 bucket where intermediate sink files will be stored #[serde(rename = "snowflake.s3_bucket")] pub s3_bucket: String, + + /// s3 credentials + #[serde(rename = "snowflake.aws_access_key_id")] + pub aws_access_key_id: String, + + /// s3 credentials + #[serde(rename = "snowflake.aws_secret_access_key")] + pub aws_secret_access_key: String, + + /// The s3 region, e.g., us-east-2 + #[serde(rename = "snowflake.aws_region")] + pub aws_region: String, } #[serde_as] @@ -242,10 +254,15 @@ impl SinkWriter for SnowflakeSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { self.append_only(chunk).await?; + // When the number of row exceeds `MAX_BATCH_ROW_NUM` if self.at_sink_threshold() { + // first sink to the external stage provided by user (i.e., s3) self.s3_client.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix).await?; + // then trigger `insertFiles` post request to snowflake self.http_client.send_request(self.sink_file_suffix).await?; + // reset `payload` & `row_counter` self.reset(); + // to ensure s3 sink file unique self.sink_file_suffix += 1; } } From 805c44f8aba2bad37ed96015e25ad9a18d73b958 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 16:35:18 -0400 Subject: [PATCH 20/38] enable basic snowflake sink pipeline --- src/connector/src/sink/snowflake.rs | 13 +++++-- src/connector/src/sink/snowflake_connector.rs | 34 ++++++++++++++++--- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index d11424c8e8e1..1806938bf5fc 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -183,7 +183,12 @@ impl SnowflakeSinkWriter { HashMap::new(), ); - let s3_client = SnowflakeS3Client::new(config.common.s3_bucket.clone()).await; + let s3_client = SnowflakeS3Client::new( + config.common.s3_bucket.clone(), + config.common.aws_access_key_id.clone(), + config.common.aws_secret_access_key.clone(), + config.common.aws_region.clone(), + ).await; Self { config, @@ -216,8 +221,8 @@ impl SnowflakeSinkWriter { // sink it from external stage (i.e., s3) } - fn at_sink_threshold(&self) { - self.counter >= MAX_BATCH_ROW_NUM + fn at_sink_threshold(&self) -> bool { + self.row_counter >= MAX_BATCH_ROW_NUM } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { @@ -265,5 +270,7 @@ impl SinkWriter for SnowflakeSinkWriter { // to ensure s3 sink file unique self.sink_file_suffix += 1; } + + Ok(()) } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 003f068a6807..634ab5e1db7a 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -18,6 +18,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use aws_config; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client as S3Client; +use aws_sdk_s3::config::Credentials; +use aws_types::region::Region; +use aws_config::meta::region::RegionProviderChain; use bytes::Bytes; use http::request::Builder; use http::header; @@ -78,8 +81,6 @@ impl SnowflakeHttpClient { SNOWFLAKE_REQUEST_ID ); - println!("url: {}", url); - Self { url, rsa_public_key_fp, @@ -137,7 +138,7 @@ impl SnowflakeHttpClient { } fn build_request_and_client(&self) -> (Builder, Client>) { - let mut builder = Request::post(self.url.clone()); + let builder = Request::post(self.url.clone()); let connector = HttpsConnector::new(); let client = Client::builder() @@ -191,8 +192,31 @@ pub struct SnowflakeS3Client { } impl SnowflakeS3Client { - pub async fn new(s3_bucket: String) -> Self { - let config = aws_config::load_from_env().await; + pub async fn new( + s3_bucket: String, + aws_access_key_id: String, + aws_secret_access_key: String, + aws_region: String, + ) -> Self { + let credentials = Credentials::new( + aws_access_key_id, + aws_secret_access_key, + // we don't allow temporary credentials + None, + None, + "rw_sink_to_s3_credentials", + ); + + let region = RegionProviderChain::first_try(Region::new(aws_region)) + .or_default_provider(); + + let config = aws_config::from_env() + .credentials_provider(credentials) + .region(region) + .load() + .await; + + // create the brand new s3 client used to sink files to s3 let s3_client = S3Client::new(&config); Self { From 5b26ccd046ec378714f4c1b57d7af5d4b1443910 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 16:37:58 -0400 Subject: [PATCH 21/38] improve format --- src/connector/src/sink/snowflake.rs | 7 +++++-- src/connector/src/sink/snowflake_connector.rs | 15 ++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 1806938bf5fc..45a6e3842a69 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -188,7 +188,8 @@ impl SnowflakeSinkWriter { config.common.aws_access_key_id.clone(), config.common.aws_secret_access_key.clone(), config.common.aws_region.clone(), - ).await; + ) + .await; Self { config, @@ -262,7 +263,9 @@ impl SinkWriter for SnowflakeSinkWriter { // When the number of row exceeds `MAX_BATCH_ROW_NUM` if self.at_sink_threshold() { // first sink to the external stage provided by user (i.e., s3) - self.s3_client.sink_to_s3(self.payload.clone().into(), self.sink_file_suffix).await?; + self.s3_client + .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) + .await?; // then trigger `insertFiles` post request to snowflake self.http_client.send_request(self.sink_file_suffix).await?; // reset `payload` & `row_counter` diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 634ab5e1db7a..20dc68d94a0b 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -16,14 +16,14 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use aws_config; +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::config::Credentials; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::config::Credentials; use aws_types::region::Region; -use aws_config::meta::region::RegionProviderChain; use bytes::Bytes; -use http::request::Builder; use http::header; +use http::request::Builder; use hyper::body::Body; use hyper::client::HttpConnector; use hyper::{Client, Request, StatusCode}; @@ -164,8 +164,10 @@ impl SnowflakeHttpClient { ); let request = builder - // TODO: ensure this - .body(Body::from(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_num))) + .body(Body::from(format!( + "{}_{}", + S3_INTERMEDIATE_FILE_NAME, file_num + ))) .map_err(|err| SinkError::Snowflake(err.to_string()))?; let response = client @@ -207,8 +209,7 @@ impl SnowflakeS3Client { "rw_sink_to_s3_credentials", ); - let region = RegionProviderChain::first_try(Region::new(aws_region)) - .or_default_provider(); + let region = RegionProviderChain::first_try(Region::new(aws_region)).or_default_provider(); let config = aws_config::from_env() .credentials_provider(credentials) From 8bc0bf48f5024dfc9b284d19e592dc5b12678651 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 16:39:55 -0400 Subject: [PATCH 22/38] update comment --- src/connector/src/sink/snowflake.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 45a6e3842a69..7a9f6a6d47b3 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -213,13 +213,13 @@ impl SnowflakeSinkWriter { } } + /// Note that we shall NOT reset the `sink_file_suffix` + /// since we need to incrementally keep the sink + /// file *unique*, otherwise snowflake will not + /// sink it from external stage (i.e., s3) fn reset(&mut self) { self.payload.clear(); self.row_counter = 0; - // Note that we shall NOT reset the `sink_file_suffix` - // since we need to incrementally keep the sink - // file *unique*, otherwise snowflake will not - // sink it from external stage (i.e., s3) } fn at_sink_threshold(&self) -> bool { From ce20818413864813dfb07fcdda4c5a319b9c3369 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 16:55:38 -0400 Subject: [PATCH 23/38] fix check --- src/connector/src/sink/snowflake.rs | 10 +++++----- src/connector/src/sink/snowflake_connector.rs | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 7a9f6a6d47b3..53a690e42966 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -53,18 +53,18 @@ pub struct SnowflakeCommon { /// The unique, snowflake provided `account_identifier` /// NOTE: please use the form `-` - /// For detailed guidance, reference: https://docs.snowflake.com/en/user-guide/admin-account-identifier + /// For detailed guidance, reference: #[serde(rename = "snowflake.account_identifier")] pub account_identifier: String, /// The user that owns the table to be sinked /// NOTE: the user should've been granted corresponding *role* - /// reference: https://docs.snowflake.com/en/sql-reference/sql/grant-role + /// reference: #[serde(rename = "snowflake.user")] pub user: String, /// The public key fingerprint used when generating custom `jwt_token` - /// reference: https://docs.snowflake.com/en/developer-guide/sql-api/authenticating + /// reference: #[serde(rename = "snowflake.rsa_public_key_fp")] pub rsa_public_key_fp: String, @@ -226,7 +226,7 @@ impl SnowflakeSinkWriter { self.row_counter >= MAX_BATCH_ROW_NUM } - async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { if op != Op::Insert { continue; @@ -258,7 +258,7 @@ impl SinkWriter for SnowflakeSinkWriter { } async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - self.append_only(chunk).await?; + self.append_only(chunk)?; // When the number of row exceeds `MAX_BATCH_ROW_NUM` if self.at_sink_threshold() { diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 20dc68d94a0b..4225ab46f3b5 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -40,7 +40,7 @@ const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; /// Claims is used when constructing `jwt_token` /// with payload specified. -/// reference: https://docs.snowflake.com/en/developer-guide/sql-api/authenticating +/// reference: #[derive(Debug, Serialize, Deserialize)] struct Claims { iss: String, @@ -237,7 +237,7 @@ impl SnowflakeS3Client { .map_err(|err| { SinkError::Snowflake(format!( "failed to sink data to S3, error: {}", - err.to_string() + err )) })?; From b7fa1bc717fdb533e76e8c7cc2bef825853ff014 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 17:01:22 -0400 Subject: [PATCH 24/38] fix fmt --- src/connector/src/sink/snowflake_connector.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 4225ab46f3b5..189883758751 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -235,10 +235,7 @@ impl SnowflakeS3Client { .send() .await .map_err(|err| { - SinkError::Snowflake(format!( - "failed to sink data to S3, error: {}", - err - )) + SinkError::Snowflake(format!("failed to sink data to S3, error: {}", err)) })?; Ok(()) From 5c7ac05a026c9f78a41ee8de87686f95030cff9b Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 12 Mar 2024 17:10:12 -0400 Subject: [PATCH 25/38] update fmt --- src/connector/src/sink/snowflake_connector.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 189883758751..3a63efa86d4a 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -124,15 +124,12 @@ impl SnowflakeHttpClient { &EncodingKey::from_rsa_pem(self.private_key.as_ref()).map_err(|err| { SinkError::Snowflake(format!( "failed to encode from provided rsa pem key, error: {}", - err.to_string() + err )) })?, ) .map_err(|err| { - SinkError::Snowflake(format!( - "failed to encode jwt_token, error: {}", - err.to_string() - )) + SinkError::Snowflake(format!("failed to encode jwt_token, error: {}", err)) })?; Ok(jwt_token) } From 287fa2c4c0315f20aa22d70242c0fcbad241a41e Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 14:17:54 -0400 Subject: [PATCH 26/38] make max_batch_row_num configurable --- src/connector/src/sink/snowflake.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 53a690e42966..eb3a191fecba 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -35,7 +35,6 @@ use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; -const MAX_BATCH_ROW_NUM: u32 = 1000; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct SnowflakeCommon { @@ -87,6 +86,9 @@ pub struct SnowflakeCommon { /// The s3 region, e.g., us-east-2 #[serde(rename = "snowflake.aws_region")] pub aws_region: String, + + #[serde(rename = "snowflake.max_batch_row_num")] + pub max_batch_row_num: String, } #[serde_as] @@ -162,6 +164,8 @@ pub struct SnowflakeSinkWriter { row_encoder: JsonEncoder, row_counter: u32, payload: String, + /// the threshold for sinking to s3 + max_batch_row_num: u32, sink_file_suffix: u32, } @@ -191,6 +195,8 @@ impl SnowflakeSinkWriter { ) .await; + let max_batch_row_num = config.common.max_batch_row_num.clone().parse::().expect("failed to parse `snowflake.max_batch_row_num` as a `u32`"); + Self { config, schema: schema.clone(), @@ -208,6 +214,7 @@ impl SnowflakeSinkWriter { ), row_counter: 0, payload: String::new(), + max_batch_row_num, // Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0` sink_file_suffix: 0, } @@ -223,7 +230,7 @@ impl SnowflakeSinkWriter { } fn at_sink_threshold(&self) -> bool { - self.row_counter >= MAX_BATCH_ROW_NUM + self.row_counter >= self.max_batch_row_num } fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { From 4c10695c308c0d48113d7729315782acbb816024 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 14:18:27 -0400 Subject: [PATCH 27/38] update fmt --- src/connector/src/sink/snowflake.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index eb3a191fecba..86a510d81d5a 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -195,7 +195,12 @@ impl SnowflakeSinkWriter { ) .await; - let max_batch_row_num = config.common.max_batch_row_num.clone().parse::().expect("failed to parse `snowflake.max_batch_row_num` as a `u32`"); + let max_batch_row_num = config + .common + .max_batch_row_num + .clone() + .parse::() + .expect("failed to parse `snowflake.max_batch_row_num` as a `u32`"); Self { config, From ccfd6383ec3f30ec7399c113ff3fa7ea9d6d48fd Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 14:29:02 -0400 Subject: [PATCH 28/38] sink payload when checkpoint barrier comes --- src/connector/src/sink/snowflake.rs | 35 ++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 86a510d81d5a..9dc8d734a16a 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -87,6 +87,8 @@ pub struct SnowflakeCommon { #[serde(rename = "snowflake.aws_region")] pub aws_region: String, + /// The configurable max row(s) to batch, + /// which should be explicitly specified by user(s) #[serde(rename = "snowflake.max_batch_row_num")] pub max_batch_row_num: String, } @@ -249,6 +251,22 @@ impl SnowflakeSinkWriter { } Ok(()) } + + /// Sink `payload` to s3, then trigger corresponding `insertFiles` post request + /// to snowflake, to finish the overall sinking pipeline. + async fn sink_payload(&mut self) -> Result<()> { + // first sink to the external stage provided by user (i.e., s3) + self.s3_client + .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) + .await?; + // then trigger `insertFiles` post request to snowflake + self.http_client.send_request(self.sink_file_suffix).await?; + // reset `payload` & `row_counter` + self.reset(); + // to ensure s3 sink file unique + self.sink_file_suffix += 1; + Ok(()) + } } #[async_trait] @@ -265,7 +283,11 @@ impl SinkWriter for SnowflakeSinkWriter { Ok(()) } - async fn barrier(&mut self, _is_checkpoint: bool) -> Result { + async fn barrier(&mut self, is_checkpoint: bool) -> Result { + if is_checkpoint { + // sink all the row(s) currently batched in `self.payload` + self.sink_payload().await?; + } Ok(()) } @@ -274,16 +296,7 @@ impl SinkWriter for SnowflakeSinkWriter { // When the number of row exceeds `MAX_BATCH_ROW_NUM` if self.at_sink_threshold() { - // first sink to the external stage provided by user (i.e., s3) - self.s3_client - .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) - .await?; - // then trigger `insertFiles` post request to snowflake - self.http_client.send_request(self.sink_file_suffix).await?; - // reset `payload` & `row_counter` - self.reset(); - // to ensure s3 sink file unique - self.sink_file_suffix += 1; + self.sink_payload().await?; } Ok(()) From 7ddbdf8e24cc3f92a235a625d64c5c78d2fa0f69 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 14:31:13 -0400 Subject: [PATCH 29/38] update fmt --- src/connector/src/sink/snowflake.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 9dc8d734a16a..aaa41773cc36 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -257,8 +257,8 @@ impl SnowflakeSinkWriter { async fn sink_payload(&mut self) -> Result<()> { // first sink to the external stage provided by user (i.e., s3) self.s3_client - .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) - .await?; + .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) + .await?; // then trigger `insertFiles` post request to snowflake self.http_client.send_request(self.sink_file_suffix).await?; // reset `payload` & `row_counter` From 3fc56b992354eff5995e6e897a194782e4282cc5 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 14:52:53 -0400 Subject: [PATCH 30/38] use epoch with file_suffix as the unique identifier for sink file to s3 --- src/connector/src/sink/snowflake.rs | 22 ++++++++++++++++--- src/connector/src/sink/snowflake_connector.rs | 8 +++---- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index aaa41773cc36..89dba0e233d5 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -168,6 +168,8 @@ pub struct SnowflakeSinkWriter { payload: String, /// the threshold for sinking to s3 max_batch_row_num: u32, + /// The current epoch, used in naming the sink files + epoch: u64, sink_file_suffix: u32, } @@ -222,6 +224,7 @@ impl SnowflakeSinkWriter { row_counter: 0, payload: String::new(), max_batch_row_num, + epoch: 0, // Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0` sink_file_suffix: 0, } @@ -252,15 +255,27 @@ impl SnowflakeSinkWriter { Ok(()) } + fn update_epoch(&mut self, epoch: u64) { + self.epoch = epoch; + } + + /// Construct the file suffix for current sink + fn file_suffix(&self) -> String { + format!("{}_{}", self.epoch, self.sink_file_suffix) + } + /// Sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn sink_payload(&mut self) -> Result<()> { + if self.payload.is_empty() { + return Ok(()); + } // first sink to the external stage provided by user (i.e., s3) self.s3_client - .sink_to_s3(self.payload.clone().into(), self.sink_file_suffix) + .sink_to_s3(self.payload.clone().into(), self.file_suffix()) .await?; // then trigger `insertFiles` post request to snowflake - self.http_client.send_request(self.sink_file_suffix).await?; + self.http_client.send_request(self.file_suffix()).await?; // reset `payload` & `row_counter` self.reset(); // to ensure s3 sink file unique @@ -271,7 +286,8 @@ impl SnowflakeSinkWriter { #[async_trait] impl SinkWriter for SnowflakeSinkWriter { - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.update_epoch(epoch); Ok(()) } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 3a63efa86d4a..0ea32176971d 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -147,7 +147,7 @@ impl SnowflakeHttpClient { /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 - pub async fn send_request(&self, file_num: u32) -> Result<()> { + pub async fn send_request(&self, file_suffix: String) -> Result<()> { let (builder, client) = self.build_request_and_client(); // Generate the jwt_token @@ -163,7 +163,7 @@ impl SnowflakeHttpClient { let request = builder .body(Body::from(format!( "{}_{}", - S3_INTERMEDIATE_FILE_NAME, file_num + S3_INTERMEDIATE_FILE_NAME, file_suffix ))) .map_err(|err| SinkError::Snowflake(err.to_string()))?; @@ -223,11 +223,11 @@ impl SnowflakeS3Client { } } - pub async fn sink_to_s3(&self, data: Bytes, file_num: u32) -> Result<()> { + pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { self.s3_client .put_object() .bucket(self.s3_bucket.clone()) - .key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_num)) + .key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_suffix)) .body(ByteStream::from(data)) .send() .await From 9ceab04877171536df5808ef73219c7596a6c43e Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 15:15:23 -0400 Subject: [PATCH 31/38] update validate to ensure append-only --- src/connector/src/sink/snowflake.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 89dba0e233d5..bdf889bf8d01 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -135,6 +135,11 @@ impl Sink for SnowflakeSink { } async fn validate(&self) -> Result<()> { + if !self.is_append_only { + return Err(SinkError::Config( + anyhow!("SnowflakeSink only supports append-only mode at present, please change the configuration accordingly to enable sinking.") + )); + } Ok(()) } } @@ -245,9 +250,7 @@ impl SnowflakeSinkWriter { fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } + assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); self.payload.push_str(&row_json_string); self.row_counter += 1; @@ -259,7 +262,7 @@ impl SnowflakeSinkWriter { self.epoch = epoch; } - /// Construct the file suffix for current sink + /// Construct the *unique* file suffix for the sink fn file_suffix(&self) -> String { format!("{}_{}", self.epoch, self.sink_file_suffix) } From af5382137ee8ca4c8d184bd7390d345b0c372fb7 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 16:13:37 -0400 Subject: [PATCH 32/38] support s3_path for configuration --- src/connector/src/sink/snowflake.rs | 9 +++++++ src/connector/src/sink/snowflake_connector.rs | 25 ++++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index bdf889bf8d01..09a945ee2cba 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -75,6 +75,13 @@ pub struct SnowflakeCommon { #[serde(rename = "snowflake.s3_bucket")] pub s3_bucket: String, + /// The optional s3 path to be specified + /// the actual file location would be `:///` + /// if this field is specified by user(s) + /// otherwise it would be `://` + #[serde(rename = "snowflake.s3_path")] + pub s3_path: Option, + /// s3 credentials #[serde(rename = "snowflake.aws_access_key_id")] pub aws_access_key_id: String, @@ -194,10 +201,12 @@ impl SnowflakeSinkWriter { config.common.rsa_public_key_fp.clone(), config.common.private_key.clone(), HashMap::new(), + config.common.s3_path.clone(), ); let s3_client = SnowflakeS3Client::new( config.common.s3_bucket.clone(), + config.common.s3_path.clone(), config.common.aws_access_key_id.clone(), config.common.aws_secret_access_key.clone(), config.common.aws_region.clone(), diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 0ea32176971d..4e2e4a89a69c 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -38,6 +38,14 @@ const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; +/// The helper function to generate the s3 file name +fn generate_s3_file_name(s3_path: Option, suffix: String) -> String { + match s3_path { + Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix), + None => format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, suffix), + } +} + /// Claims is used when constructing `jwt_token` /// with payload specified. /// reference: @@ -57,6 +65,7 @@ pub struct SnowflakeHttpClient { user: String, private_key: String, header: HashMap, + s3_path: Option, } impl SnowflakeHttpClient { @@ -69,6 +78,7 @@ impl SnowflakeHttpClient { rsa_public_key_fp: String, private_key: String, header: HashMap, + s3_path: Option, ) -> Self { // TODO: ensure if we need user to *explicitly* provide the `request_id` let url = format!( @@ -88,6 +98,7 @@ impl SnowflakeHttpClient { user, private_key, header, + s3_path, } } @@ -161,9 +172,9 @@ impl SnowflakeHttpClient { ); let request = builder - .body(Body::from(format!( - "{}_{}", - S3_INTERMEDIATE_FILE_NAME, file_suffix + .body(Body::from(generate_s3_file_name( + self.s3_path.clone(), + file_suffix, ))) .map_err(|err| SinkError::Snowflake(err.to_string()))?; @@ -187,12 +198,14 @@ impl SnowflakeHttpClient { /// TODO(Zihao): refactor this part after s3 sink is available pub struct SnowflakeS3Client { s3_bucket: String, + s3_path: Option, s3_client: S3Client, } impl SnowflakeS3Client { pub async fn new( s3_bucket: String, + s3_path: Option, aws_access_key_id: String, aws_secret_access_key: String, aws_region: String, @@ -219,6 +232,7 @@ impl SnowflakeS3Client { Self { s3_bucket, + s3_path, s3_client, } } @@ -227,7 +241,10 @@ impl SnowflakeS3Client { self.s3_client .put_object() .bucket(self.s3_bucket.clone()) - .key(format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, file_suffix)) + .key(generate_s3_file_name( + self.s3_path.clone(), + file_suffix, + )) .body(ByteStream::from(data)) .send() .await From d877b149728941341f74c8ffe802aa2f69408c09 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 18 Mar 2024 16:14:08 -0400 Subject: [PATCH 33/38] udpate fmt --- src/connector/src/sink/snowflake_connector.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 4e2e4a89a69c..c019440793bc 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -241,10 +241,7 @@ impl SnowflakeS3Client { self.s3_client .put_object() .bucket(self.s3_bucket.clone()) - .key(generate_s3_file_name( - self.s3_path.clone(), - file_suffix, - )) + .key(generate_s3_file_name(self.s3_path.clone(), file_suffix)) .body(ByteStream::from(data)) .send() .await From dbc468a3da4d2603cd49f51e124abc1aafda93e6 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 4 Apr 2024 11:24:45 -0400 Subject: [PATCH 34/38] update comments --- src/connector/src/sink/snowflake_connector.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index c019440793bc..f1837a95822c 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -80,7 +80,8 @@ impl SnowflakeHttpClient { header: HashMap, s3_path: Option, ) -> Self { - // TODO: ensure if we need user to *explicitly* provide the `request_id` + // todo: ensure if we need user to *explicitly* provide the `request_id` + // currently it seems that this is not important let url = format!( "https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?requestId={}", account.clone(), @@ -195,7 +196,7 @@ impl SnowflakeHttpClient { } } -/// TODO(Zihao): refactor this part after s3 sink is available +/// todo: refactor this part after s3 sink is available pub struct SnowflakeS3Client { s3_bucket: String, s3_path: Option, From 426cb61e420f00d74f30838a8c92722bdafb8a7f Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 4 Apr 2024 21:11:40 -0400 Subject: [PATCH 35/38] add reference to snowpipe rest api --- src/connector/src/sink/snowflake_connector.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index f1837a95822c..4f0c2741e2b8 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -82,6 +82,8 @@ impl SnowflakeHttpClient { ) -> Self { // todo: ensure if we need user to *explicitly* provide the `request_id` // currently it seems that this is not important + // reference to the snowpipe rest api is as below, i.e., + // let url = format!( "https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?requestId={}", account.clone(), From 4b257d574c06a08fe769cdcd7e380065088c17da Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 9 Apr 2024 11:48:40 -0400 Subject: [PATCH 36/38] update error msg & comments --- src/connector/src/sink/snowflake.rs | 4 ++-- src/connector/src/sink/snowflake_connector.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 09a945ee2cba..bf8545d82300 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -95,7 +95,7 @@ pub struct SnowflakeCommon { pub aws_region: String, /// The configurable max row(s) to batch, - /// which should be explicitly specified by user(s) + /// which should be *explicitly* specified by user(s) #[serde(rename = "snowflake.max_batch_row_num")] pub max_batch_row_num: String, } @@ -144,7 +144,7 @@ impl Sink for SnowflakeSink { async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Config( - anyhow!("SnowflakeSink only supports append-only mode at present, please change the configuration accordingly to enable sinking.") + anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`") )); } Ok(()) diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 4f0c2741e2b8..2cc180f3c4fc 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -81,7 +81,7 @@ impl SnowflakeHttpClient { s3_path: Option, ) -> Self { // todo: ensure if we need user to *explicitly* provide the `request_id` - // currently it seems that this is not important + // currently it seems that this is not important. // reference to the snowpipe rest api is as below, i.e., // let url = format!( @@ -109,7 +109,7 @@ impl SnowflakeHttpClient { /// And please note that we will NOT strictly counting the time interval /// of `jwt_token` expiration. /// Which essentially means that this method should be called *every time* we want - /// to send `insertFiles` request to snowflake server + /// to send `insertFiles` request to snowflake server. fn generate_jwt_token(&self) -> Result { let header = Header::new(Algorithm::RS256); let now = SystemTime::now() From 583964ac1691d5040c9bb222ac83c0356401cfdb Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Tue, 9 Apr 2024 20:34:38 -0400 Subject: [PATCH 37/38] update with_options_sink accordingly --- src/connector/with_options_sink.yaml | 54 ++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index b287bcd6aa4b..07da6a36a0e3 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -528,6 +528,60 @@ RedisConfig: - name: redis.url field_type: String required: true +SnowflakeConfig: + fields: + - name: snowflake.database + field_type: String + comments: The snowflake database used for sinking + required: true + - name: snowflake.schema + field_type: String + comments: The corresponding schema where sink table exists + required: true + - name: snowflake.pipe + field_type: String + comments: The created pipe object, will be used as `insertFiles` target + required: true + - name: snowflake.account_identifier + field_type: String + comments: 'The unique, snowflake provided `account_identifier` NOTE: please use the form `-` For detailed guidance, reference: ' + required: true + - name: snowflake.user + field_type: String + comments: 'The user that owns the table to be sinked NOTE: the user should''ve been granted corresponding *role* reference: ' + required: true + - name: snowflake.rsa_public_key_fp + field_type: String + comments: 'The public key fingerprint used when generating custom `jwt_token` reference: ' + required: true + - name: snowflake.private_key + field_type: String + comments: The rsa pem key *without* encryption + required: true + - name: snowflake.s3_bucket + field_type: String + comments: The s3 bucket where intermediate sink files will be stored + required: true + - name: snowflake.s3_path + field_type: String + comments: The optional s3 path to be specified the actual file location would be `:///` if this field is specified by user(s) otherwise it would be `://` + required: false + - name: snowflake.aws_access_key_id + field_type: String + comments: s3 credentials + required: true + - name: snowflake.aws_secret_access_key + field_type: String + comments: s3 credentials + required: true + - name: snowflake.aws_region + field_type: String + comments: The s3 region, e.g., us-east-2 + required: true + - name: snowflake.max_batch_row_num + field_type: String + comments: The configurable max row(s) to batch, which should be *explicitly* specified by user(s) + required: true StarrocksConfig: fields: - name: starrocks.host From 9e52dc219efbe54c9a35c4e7e3bce3961a2245bb Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Wed, 10 Apr 2024 10:05:30 -0400 Subject: [PATCH 38/38] use uuid to ensure the global uniqueness of file suffix --- Cargo.lock | 2 +- src/connector/src/sink/snowflake.rs | 29 +++++++++++-------- src/connector/src/sink/snowflake_connector.rs | 2 +- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 086d0a13dabb..d400abdc8044 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9586,7 +9586,7 @@ dependencies = [ "itertools 0.12.1", "jni", "jsonschema-transpiler", - "jsonwebtoken 9.2.0", + "jsonwebtoken 9.3.0", "madsim-rdkafka", "madsim-tokio", "madsim-tonic", diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index bf8545d82300..ba0973a0b014 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; +use uuid::Uuid; use with_options::WithOptions; use super::encoder::{ @@ -181,8 +182,8 @@ pub struct SnowflakeSinkWriter { /// the threshold for sinking to s3 max_batch_row_num: u32, /// The current epoch, used in naming the sink files + /// mainly used for debugging purpose epoch: u64, - sink_file_suffix: u32, } impl SnowflakeSinkWriter { @@ -238,16 +239,13 @@ impl SnowflakeSinkWriter { row_counter: 0, payload: String::new(), max_batch_row_num, + // initial value of `epoch` will start from 0 epoch: 0, - // Start from 0, i.e., `RW_SNOWFLAKE_S3_SINK_FILE_0` - sink_file_suffix: 0, } } - /// Note that we shall NOT reset the `sink_file_suffix` - /// since we need to incrementally keep the sink - /// file *unique*, otherwise snowflake will not - /// sink it from external stage (i.e., s3) + /// reset the `payload` and `row_counter`. + /// shall *only* be called after a successful sink. fn reset(&mut self) { self.payload.clear(); self.row_counter = 0; @@ -271,17 +269,26 @@ impl SnowflakeSinkWriter { self.epoch = epoch; } - /// Construct the *unique* file suffix for the sink + /// generate a *global unique* uuid, + /// which is the key to the uniqueness of file suffix. + fn gen_uuid() -> Uuid { + Uuid::new_v4() + } + + /// construct the *global unique* file suffix for the sink. + /// note: this is unique even across multiple parallel writer(s). fn file_suffix(&self) -> String { - format!("{}_{}", self.epoch, self.sink_file_suffix) + // the format of suffix will be _ + format!("{}_{}", self.epoch, Self::gen_uuid()) } - /// Sink `payload` to s3, then trigger corresponding `insertFiles` post request + /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. async fn sink_payload(&mut self) -> Result<()> { if self.payload.is_empty() { return Ok(()); } + // todo: change this to streaming upload // first sink to the external stage provided by user (i.e., s3) self.s3_client .sink_to_s3(self.payload.clone().into(), self.file_suffix()) @@ -290,8 +297,6 @@ impl SnowflakeSinkWriter { self.http_client.send_request(self.file_suffix()).await?; // reset `payload` & `row_counter` self.reset(); - // to ensure s3 sink file unique - self.sink_file_suffix += 1; Ok(()) } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 2cc180f3c4fc..e5e37deb1465 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -38,7 +38,7 @@ const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com"; const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; -/// The helper function to generate the s3 file name +/// The helper function to generate the *global unique* s3 file name. fn generate_s3_file_name(s3_path: Option, suffix: String) -> String { match s3_path { Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix),