Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): implement snowflake sink #15429

Merged
merged 40 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
82c03f9
basic structure for snowflake sink
xzhseh Mar 4, 2024
03c5aa0
fix format
xzhseh Mar 6, 2024
4f7cac9
Merge branch 'main' into xzhseh/snowflake-sink
xzhseh Mar 6, 2024
e9da466
update snowflake common
xzhseh Mar 7, 2024
805b6a2
add snowflake_connector.rs
xzhseh Mar 7, 2024
f0657e2
add snowflake inserter (and builder)
xzhseh Mar 8, 2024
b6bdd34
update license
xzhseh Mar 8, 2024
827a1d9
add snowflake http client
xzhseh Mar 8, 2024
e19c3ea
update fmt
xzhseh Mar 8, 2024
53ef2c5
remove redundant import
xzhseh Mar 8, 2024
25d3aef
add jwt_token auto-generation
xzhseh Mar 9, 2024
fb0cade
add SnowflakeS3Client
xzhseh Mar 11, 2024
cd4168c
update SnowflakeSinkWriter
xzhseh Mar 11, 2024
7a9fdf9
set three SinkWriter functions to return Ok
xzhseh Mar 11, 2024
db090a9
add log sinker
xzhseh Mar 11, 2024
95310bf
basic sink funtionality with json encoder
xzhseh Mar 12, 2024
e46b51c
add comments && update sink_to_s3
xzhseh Mar 12, 2024
cd6f587
add file num to send_request
xzhseh Mar 12, 2024
4caa11f
fix typo
xzhseh Mar 12, 2024
00d548d
add aws credentials to prevent load_from_env
xzhseh Mar 12, 2024
805c44f
enable basic snowflake sink pipeline
xzhseh Mar 12, 2024
5b26ccd
improve format
xzhseh Mar 12, 2024
8bc0bf4
update comment
xzhseh Mar 12, 2024
ce20818
fix check
xzhseh Mar 12, 2024
b7fa1bc
fix fmt
xzhseh Mar 12, 2024
5c7ac05
update fmt
xzhseh Mar 12, 2024
287fa2c
make max_batch_row_num configurable
xzhseh Mar 18, 2024
4c10695
update fmt
xzhseh Mar 18, 2024
ccfd638
sink payload when checkpoint barrier comes
xzhseh Mar 18, 2024
7ddbdf8
update fmt
xzhseh Mar 18, 2024
3fc56b9
use epoch with file_suffix as the unique identifier for sink file to s3
xzhseh Mar 18, 2024
9ceab04
update validate to ensure append-only
xzhseh Mar 18, 2024
af53821
support s3_path for configuration
xzhseh Mar 18, 2024
d877b14
udpate fmt
xzhseh Mar 18, 2024
dbc468a
update comments
xzhseh Apr 4, 2024
426cb61
add reference to snowpipe rest api
xzhseh Apr 5, 2024
4b257d5
update error msg & comments
xzhseh Apr 9, 2024
6373fdc
Merge branch 'main' into xzhseh/snowflake-sink
xzhseh Apr 9, 2024
583964a
update with_options_sink accordingly
xzhseh Apr 10, 2024
9e52dc2
use uuid to ensure the global uniqueness of file suffix
xzhseh Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub mod nats;
pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod snowflake;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May have a separate folder snowflake to hold the files related to snowflake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently we only have snowflake.rs for the core sinking logic, plus snowflake_connector.rs for the helper clients (i.e., rest api client, s3 client) implementations - let's keep it simple at present, and move things around when it gets bigger in the future.

pub mod snowflake_connector;
pub mod starrocks;
pub mod test_sink;
pub mod trivial;
Expand Down Expand Up @@ -88,6 +90,7 @@ macro_rules! for_all_sinks {
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ Snowflake, $crate::sink::snowflake::SnowflakeSink },
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
Expand Down Expand Up @@ -515,6 +518,8 @@ pub enum SinkError {
),
#[error("Starrocks error: {0}")]
Starrocks(String),
#[error("Snowflake error: {0}")]
Snowflake(String),
#[error("Pulsar error: {0}")]
Pulsar(
#[source]
Expand Down
155 changes: 155 additions & 0 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use serde::Deserialize;
use serde_derive::Serialize;
use serde_json::Value;
use serde_with::serde_as;
use with_options::WithOptions;

use super::encoder::JsonEncoder;
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

pub const SNOWFLAKE_SINK: &str = "snowflake";

// TODO: add comments
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct SnowflakeCommon {
#[serde(rename = "snowflake.database")]
pub database: String,

#[serde(rename = "snowflake.database.schema")]
pub schema: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we query the schema from snowflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a solution to query schema from snowflow, but the schema is just the schema name, which will be used to identify which pipe to sink to when sending insertFiles post request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can access the table schema via https://docs.snowflake.com/en/sql-reference/info-schema/tables selecting from information_schema.tables IIUC.

Copy link
Contributor Author

@xzhseh xzhseh Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the ability to query the schema from external REST API request? 🤔
This is more like a user-side query IIUC.

Copy link
Contributor

@neverchanje neverchanje Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may have to use a snowflake rust driver, like https://docs.rs/snowflake-api/latest/snowflake_api/, which is based on SQL REST API https://docs.rs/snowflake-api/latest/snowflake_api/, and issue a query to information_schema.tables

    let mut api = SnowflakeApi::with_password_auth(
        "ACCOUNT_IDENTIFIER",
        Some("WAREHOUSE"),
        Some("DATABASE"),
        Some("SCHEMA"),
        "USERNAME",
        Some("ROLE"),
        "PASSWORD",
    )?;
    let res = api.exec("select * from information_schema.tables").await?;

It's just an idea. I never tried it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this (query the schema from external REST API request) can be done by the same credentials provided below.

Asking users to provide a schema also LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asking users to provide a schema also LGTM.

Provide an entire schema seems impossible, the schema here is for insertFiles to correctly find the corresponding database.schema.pipe to sink.


#[serde(rename = "snowflake.database.schema.pipe")]
pub pipe: String,

#[serde(rename = "snowflake.account")]
pub account: String,

#[serde(rename = "snowflake.private.key")]
pub private_key: String,

#[serde(rename = "snowflake.private.key.passphrase")]
pub private_key_passphrase: Option<String>,

#[serde(rename = "snowflake.role")]
pub role: String,

#[serde(rename = "snowflake.jwt_token")]
pub jwt_token: String,

#[serde(rename = "snowflake.s3")]
pub s3: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct SnowflakeConfig {
#[serde(flatten)]
pub common: SnowflakeCommon,
}

impl SnowflakeConfig {
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
let config =
serde_json::from_value::<SnowflakeConfig>(serde_json::to_value(properties).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Ok(config)
}
}

#[derive(Debug)]
pub struct SnowflakeSink {
pub config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

impl Sink for SnowflakeSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = LogSinkerOf<SnowflakeSinkWriter>;

const SINK_NAME: &'static str = SNOWFLAKE_SINK;

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
todo!()
}

async fn validate(&self) -> Result<()> {
todo!()
}
}

pub struct SnowflakeSinkWriter {
pub config: SnowflakeSink,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
client: Option<SnowflakeClient>,
row_encoder: JsonEncoder,
}

impl TryFrom<SinkParam> for SnowflakeSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = SnowflakeConfig::from_hashmap(param.properties)?;
Ok(SnowflakeSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}

#[async_trait]
impl SinkWriter for SnowflakeSinkWriter {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
todo!()
}

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
todo!()
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
todo!()
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> {
Ok(())
}
}

pub struct SnowflakeClient {}

impl SnowflakeClient {}
51 changes: 51 additions & 0 deletions src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use http::request::Builder;
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
use hyper::body::{Body, Sender};
use hyper::client::HttpConnector;
use hyper::{body, Client, Request, StatusCode};
use hyper_tls::HttpsConnector;

use std::collections::HashMap;

use super::{Result, SinkError};

const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com";
const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK";

#[derive(Debug)]
pub struct SnowflakeInserterBuilder {
url: String,
header: HashMap<String, String>,
}

impl SnowflakeInserterBuilder {
pub fn new(account: String, db: String, schema: String, pipe: String, header: HashMap<String, String>) -> Self {
// TODO: ensure if we need user to *explicitly* provide the request id
let url = format!("https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}",
account,
SNOWFLAKE_HOST_ADDR,
db,
schema.
pipe,
SNOWFLAKE_REQUEST_ID);

Self {
url,
header,
}
}

fn build_request_and_client() -> (Builder, Client<HttpsConnector<HttpConnector>>) {

}

pub async fn build(&self) -> Result<SnowflakeInserter> {

}
}

#[derive(Debug)]
pub struct SnowflakeInserter {
sender: Option<Sender>,
join_handle: Option<JoinHandle<Result<Vec<u8>>,
buffer: BytesMut,
}
Loading