Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(steam): support stream change log #17132

Merged
merged 16 commits into from
Jun 26, 2024
Merged

feat(steam): support stream change log #17132

merged 16 commits into from
Jun 26, 2024

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Jun 5, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

add an changed log exec
use cte to create changed log

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

in this pr, we support the stream changelog. It can send the 'op' on the stream downstream as a separate column

After this operation, we will discard the upstream pk and recreate a self-incrementing id _change_log_row_id. And the table upstream of changelog cannot have the same name as changelog_op.
Syntactically, we use a cte-like syntax
example

CREATE TABLE t1(v1 int, v2 int);
CREATE MATERIALIZED VIEW mv1 AS WITH sub AS changelog from t1 SELECT * FROM sub;
INSERT INTO t1 VALUES(1,1),(2,2);
UPDATE t1 SET v2 = 100 WHERE v1 = 1;
DELETE FROM t1 WHERE v1 = 2;
SELECT * FROM mv1; 

the result is

v1 v2 changelog_op
1 1 1
2 2 1
2 2 4
2 100 3
2 2 2

xxhZs added 4 commits May 28, 2024 14:17
save
save

save

fi

fmt

support
fix
@xxhZs xxhZs marked this pull request as ready for review June 5, 2024 10:49
@xxhZs xxhZs requested review from chenzl25 and hzxa21 June 5, 2024 10:49
@xxhZs xxhZs force-pushed the xxh/stream-subscription branch 2 times, most recently from ba7bc10 to d940439 Compare June 6, 2024 03:29
fix ci

fmt

fix ci
input: PlanRef,
input_col_change: ColIndexMapping,
) -> (Self, ColIndexMapping) {
let changed_log = Self::new(input, self.core.need_op, true);
Copy link
Contributor

@chenzl25 chenzl25 Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this logic to logical_rewrite_for_stream, because it has done more things than rewrite_with_input should contain.

@hzxa21 hzxa21 added the user-facing-changes Contains changes that are visible to users label Jun 12, 2024
e2e_test/streaming/changed_log.slt Outdated Show resolved Hide resolved
@@ -287,6 +287,10 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message ChangedLogNode {
bool need_op = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is used for projection push-down to prune out op column if it is not used in downstream. Can we have some documentation to explain in which case need_op is false?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add documentation here in the pb in addition to the rust codes as well?

src/frontend/src/binder/query.rs Outdated Show resolved Hide resolved
src/sqlparser/src/ast/query.rs Outdated Show resolved Hide resolved
src/frontend/src/binder/query.rs Outdated Show resolved Hide resolved
src/sqlparser/tests/sqlparser_common.rs Outdated Show resolved Hide resolved
src/stream/src/executor/changed_log.rs Outdated Show resolved Hide resolved
src/stream/src/executor/changed_log.rs Outdated Show resolved Hide resolved
src/stream/src/executor/changed_log.rs Outdated Show resolved Hide resolved
@@ -133,7 +133,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> {
let from = None;
let cte = Cte {
alias: alias.clone(),
query,
query: Some(query),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard it is to support changelog cte in sqlsmith as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do it in next pr

@hzxa21
Copy link
Collaborator

hzxa21 commented Jun 13, 2024

I did a simple test with this PR:

dev=> create table t (v1 int primary key, v2 int);
CREATE_TABLE
dev=> create materialized view mv as select * from t;
CREATE_MATERIALIZED_VIEW
dev=> create materialized view changelog as with c as changedlog from mv select * from c;
CREATE_MATERIALIZED_VIEW

dev=> describe changelog;
        Name         |          Type           | Is Hidden | Description 
---------------------+-------------------------+-----------+-------------
 v1                  | integer                 | false     | 
 v2                  | integer                 | false     | 
 op                  | smallint                | false     | 
 _changed_log_row_id | serial                  | true      | 
 primary key         | v1, _changed_log_row_id |           | 
 distribution key    | _changed_log_row_id     |           | 
 table description   | changelog               |           | 
(7 rows)

Is it expected that the pk of changelog mv is v1, _changed_log_row_id instead of just _changed_log_row_id?

@xxhZs
Copy link
Contributor Author

xxhZs commented Jun 14, 2024

I did a simple test with this PR:

dev=> create table t (v1 int primary key, v2 int);
CREATE_TABLE
dev=> create materialized view mv as select * from t;
CREATE_MATERIALIZED_VIEW
dev=> create materialized view changelog as with c as changedlog from mv select * from c;
CREATE_MATERIALIZED_VIEW

dev=> describe changelog;
        Name         |          Type           | Is Hidden | Description 
---------------------+-------------------------+-----------+-------------
 v1                  | integer                 | false     | 
 v2                  | integer                 | false     | 
 op                  | smallint                | false     | 
 _changed_log_row_id | serial                  | true      | 
 primary key         | v1, _changed_log_row_id |           | 
 distribution key    | _changed_log_row_id     |           | 
 table description   | changelog               |           | 
(7 rows)

Is it expected that the pk of changelog mv is v1, _changed_log_row_id instead of just _changed_log_row_id?

fixed ,now pk is only _changed_log_row_id

let keys = vec![self.schema().len() - 1];
Some(keys)
} else {
None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, change_log's stream key must be not None and must be its row_id. In which case will we reach this branch?

@hzxa21
Copy link
Collaborator

hzxa21 commented Jun 17, 2024

Tried with another example:

dev=> create table t (v1 int primary key, v2 int);
CREATE_TABLE
dev=> create materialized view mv as select * from t;
CREATE_MATERIALIZED_VIEW
dev=> create materialized view changelog as with c as changelog from mv select v2 from c;
CREATE_MATERIALIZED_VIEW
dev=> describe changelog
dev-> ;
        Name        |        Type        | Is Hidden | Description 
--------------------+--------------------+-----------+-------------
 v2                 | integer            | false     | 
 mv.v1              | integer            | true      | 
 _change_log_row_id | serial             | true      | 
 primary key        | _change_log_row_id |           | 
 distribution key   | _change_log_row_id |           | 
 table description  | changelog          |           | 
(6 rows)

Is it expected to include mv.v1 here?

@hzxa21
Copy link
Collaborator

hzxa21 commented Jun 17, 2024

Another failing example:

dev=> create table t (v1 int primary key, v2 int);
CREATE_TABLE
dev=> create materialized view mv as select * from t;
CREATE_MATERIALIZED_VIEW
dev=> create materialized view changelog as with c as changelog from mv select v2, op from c;
CREATE_MATERIALIZED_VIEW
dev=>  create materialized view changelog3 as with c as changelog from changelog select * from c;
ERROR:  Failed to run the query

Caused by:
  Invalid input syntax: column "op" specified more than once

chaining changelog on top of another changelog MV

};
yield Message::Chunk(new_chunk);
}
m => yield m,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For watermark message, should we also convert it to a new kind of row, or just discard the watermark? The semantic of watermark will be broken if we just simply yield the original watermark.

@xxhZs xxhZs requested a review from a team as a code owner June 26, 2024 08:59
Copy link

gitguardian bot commented Jun 26, 2024

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password 0b21756 ci/scripts/e2e-sink-test.sh View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM. Please update the Release Note as well.

@@ -287,6 +287,10 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message ChangedLogNode {
bool need_op = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add documentation here in the pb in addition to the rust codes as well?

@xxhZs xxhZs added this pull request to the merge queue Jun 26, 2024
Merged via the queue into main with commit 61e9e52 Jun 26, 2024
39 of 41 checks passed
@xxhZs xxhZs deleted the xxh/stream-subscription branch June 26, 2024 11:53
@hzxa21 hzxa21 changed the title feat(steam): support stream changed log feat(steam): support stream change log Aug 26, 2024
xxhZs added a commit that referenced this pull request Aug 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants