Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): replace MViewTable with CellBasedTable and add relational interfaces #1046

Merged
merged 12 commits into from
Mar 21, 2022
11 changes: 6 additions & 5 deletions rust/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId};
use risingwave_common::error::Result;
use risingwave_pb::plan::plan_node::NodeBody;
use risingwave_storage::table::mview::{MViewTable, MViewTableIter};
use risingwave_storage::table::cell_based_table::{CellBasedTable, CellBasedTableRowIter};
// use risingwave_storage::table::mview::{MViewTable, MViewTableIter};
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreImpl};

use super::{BoxedExecutor, BoxedExecutorBuilder};
use crate::executor::{Executor, ExecutorBuilder};

/// Executor that scans data from row table
pub struct RowSeqScanExecutor<S: StateStore> {
table: MViewTable<S>,
table: CellBasedTable<S>,
/// An iterator to scan StateStore.
iter: Option<MViewTableIter<S>>,
iter: Option<CellBasedTableRowIter<S>>,
primary: bool,

chunk_size: usize,
Expand All @@ -39,7 +40,7 @@ pub struct RowSeqScanExecutor<S: StateStore> {

impl<S: StateStore> RowSeqScanExecutor<S> {
pub fn new(
table: MViewTable<S>,
table: CellBasedTable<S>,
chunk_size: usize,
primary: bool,
identity: String,
Expand Down Expand Up @@ -90,7 +91,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.collect_vec();
dispatch_state_store!(source.global_batch_env().state_store(), state_store, {
let keyspace = Keyspace::table_root(state_store, &table_id);
let table = MViewTable::new_adhoc(keyspace, column_descs);
let table = CellBasedTable::new_adhoc(keyspace, column_descs);
Ok(Box::new(RowSeqScanExecutor::new(
table,
RowSeqScanExecutorBuilder::DEFAULT_CHUNK_SIZE,
Expand Down
1 change: 1 addition & 0 deletions rust/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct OrderedColumnDesc {
pub column_desc: ColumnDesc,
pub order: OrderType,
}

impl ColumnDesc {
pub fn unnamed(column_id: ColumnId, data_type: DataType) -> ColumnDesc {
ColumnDesc {
Expand Down
5 changes: 3 additions & 2 deletions rust/compute/tests/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::cell_based_table::CellBasedTable;
use risingwave_storage::Keyspace;
use risingwave_stream::executor::{MViewTable, ManagedMViewState};
use risingwave_stream::executor::ManagedMViewState;

#[tokio::test]
async fn test_row_seq_scan() -> Result<()> {
Expand All @@ -43,7 +44,7 @@ async fn test_row_seq_scan() -> Result<()> {
ColumnDesc::unnamed(ColumnId::from(1), schema[1].data_type.clone()),
];

let table = MViewTable::new_adhoc(keyspace, column_descs);
let table = CellBasedTable::new_adhoc(keyspace, column_descs);

let mut executor =
RowSeqScanExecutor::new(table, 1, true, "RowSeqScanExecutor".to_string(), u64::MAX);
Expand Down
5 changes: 3 additions & 2 deletions rust/compute/tests/table_v2_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use risingwave_pb::plan::ColumnDesc as ProstColumnDesc;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::monitor::StateStoreMetrics;
use risingwave_storage::table::mview::MViewTable;
use risingwave_storage::table::cell_based_table::CellBasedTable;
// use risingwave_storage::table::mview::MViewTable;
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_storage::{Keyspace, StateStore, StateStoreImpl};
use risingwave_stream::executor::{
Barrier, Executor as StreamExecutor, MaterializeExecutor, Message, PkIndices, SourceExecutor,
Expand Down Expand Up @@ -202,7 +203,7 @@ async fn test_table_v2_materialize() -> Result<()> {

// Since we have not polled `Materialize`, we cannot scan anything from this table
let keyspace = Keyspace::table_root(memory_state_store, &source_table_id);
let table = MViewTable::new_adhoc(keyspace, column_descs);
let table = CellBasedTable::new_adhoc(keyspace, column_descs);

let mut scan = RowSeqScanExecutor::new(
table.clone(),
Expand Down
27 changes: 27 additions & 0 deletions rust/storage/src/cell_based_row_serializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use risingwave_common::array::Row;
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::Result;
use risingwave_common::util::ordered::serialize_pk_and_row;

type KeyBytes = Vec<u8>;
type ValueBytes = Vec<u8>;
#[derive(Clone)]
pub struct CellBasedRowSerializer {}
impl Default for CellBasedRowSerializer {
fn default() -> Self {
Self::new()
}
}
impl CellBasedRowSerializer {
pub fn new() -> Self {
Self {}
}
pub fn serialize(
&mut self,
pk: &[u8],
row: Option<Row>,
column_ids: Vec<ColumnId>,
) -> Result<Vec<(KeyBytes, Option<ValueBytes>)>> {
serialize_pk_and_row(pk, &row, &column_ids)
}
}
1 change: 1 addition & 0 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![feature(let_chains)]

pub mod cell_based_row_deserializer;
pub mod cell_based_row_serializer;
pub mod hummock;
pub mod keyspace;
pub mod memory;
Expand Down
Loading