diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index dbb1775836e21..4447fbdc55932 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -28,13 +28,14 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use parquet::basic; +use parquet::column::reader::*; +use parquet::data_type::ByteArray; use parquet::file::reader::*; -use parquet::record::{Row, RowAccessor}; use parquet::schema::types::Type; use crate::datasource::{RecordBatchIterator, Table}; use crate::execution::error::{ExecutionError, Result}; -use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder}; +use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder}; pub struct ParquetTable { filename: String, @@ -54,7 +55,6 @@ impl ParquetTable { } impl Table for ParquetTable { - fn schema(&self) -> &Arc { &self.schema } @@ -78,10 +78,13 @@ pub struct ParquetFile { projection: Vec, batch_size: usize, current_row_group: Option>, + column_readers: Vec, } impl ParquetFile { pub fn open(file: File, projection: Option>) -> Result { + println!("open()"); + let reader = SerializedFileReader::new(file).unwrap(); let metadata = reader.metadata(); @@ -103,13 +106,22 @@ impl ParquetFile { } }; + let projected_fields: Vec = projection + .iter() + .map(|i| schema.fields()[*i].clone()) + .collect(); + + let projected_schema = Arc::new(Schema::new(projected_fields)); + println!("projected schema: {:?}", projected_schema); + Ok(ParquetFile { reader: reader, row_group_index: 0, - schema: Arc::new(schema), + schema: projected_schema, projection, batch_size: 64 * 1024, current_row_group: None, + column_readers: vec![], }) } _ => Err(ExecutionError::General( @@ -120,7 +132,17 @@ impl ParquetFile { fn load_next_row_group(&mut self) { if self.row_group_index < self.reader.num_row_groups() { + //println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups()); let reader = self.reader.get_row_group(self.row_group_index).unwrap(); + + self.column_readers = vec![]; + + for i in &self.projection { + //TODO validate index in bounds + self.column_readers + .push(reader.get_column_reader(*i).unwrap()); + } + self.current_row_group = Some(reader); self.row_group_index += 1; } else { @@ -129,86 +151,134 @@ impl ParquetFile { } fn load_batch(&mut self) -> Result> { + println!("load_batch()"); match &self.current_row_group { Some(reader) => { - // read batch of rows into memory + let mut batch: Vec> = Vec::with_capacity(reader.num_columns()); + let mut row_count = 0; + for i in 0..self.column_readers.len() { + let array: Arc = match self.column_readers[i] { + ColumnReader::BoolColumnReader(ref mut _r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (BOOL)".to_string(), + )); + } + ColumnReader::Int32ColumnReader(ref mut r) => { + let mut read_buffer: Vec = + Vec::with_capacity(self.batch_size); - // let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect(); + for _ in 0..self.batch_size { + read_buffer.push(0); + } - let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down - let mut rows: Vec = Vec::with_capacity(self.batch_size); - while let Some(row) = row_iter.next() { - if rows.len() == self.batch_size { - break; - } - rows.push(row); - } - println!("Loaded {} rows into memory", rows.len()); - - // convert to columnar batch - let mut batch: Vec> = - Vec::with_capacity(self.projection.len()); - for i in &self.projection { - let array: Arc = match self.schema.field(*i).data_type() { - DataType::Int32 => { - let mut builder = Int32Builder::new(rows.len()); - for row in &rows { - //TODO null handling - builder.append_value(row.get_int(*i).unwrap()).unwrap(); + match r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + ) { + //TODO this isn't handling null values + Ok((count, _)) => { + println!("Read {} rows", count); + let mut builder = Int32Builder::new(count); + builder.append_slice(&read_buffer[0..count]).unwrap(); + row_count = count; + Arc::new(builder.finish()) + } + _ => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {})", + i + ))); + } } - Arc::new(builder.finish()) } - DataType::Float32 => { - let mut builder = Float32Builder::new(rows.len()); - for row in &rows { - //TODO null handling - builder.append_value(row.get_float(*i).unwrap()).unwrap(); - } - Arc::new(builder.finish()) + ColumnReader::Int64ColumnReader(ref mut _r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (INT64)".to_string(), + )); } - DataType::Float64 => { - let mut builder = Float64Builder::new(rows.len()); - for row in &rows { - //TODO null handling - builder - .append_value(row.get_double(*i).unwrap()) - .unwrap(); - } - Arc::new(builder.finish()) + ColumnReader::Int96ColumnReader(ref mut _r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (INT96)".to_string(), + )); } - DataType::Utf8 => { - let mut builder = BinaryBuilder::new(rows.len()); - for row in &rows { - //TODO null handling - let bytes = row.get_bytes(*i).unwrap(); - builder - .append_string( - &String::from_utf8(bytes.data().to_vec()) - .unwrap(), - ) - .unwrap(); + ColumnReader::FloatColumnReader(ref mut _r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (FLOAT)".to_string(), + )); + } + ColumnReader::DoubleColumnReader(ref mut r) => { + let mut builder = Float64Builder::new(self.batch_size); + let mut read_buffer: Vec = + Vec::with_capacity(self.batch_size); + match r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + ) { + //TODO this isn't handling null values + Ok((count, _)) => { + builder.append_slice(&read_buffer).unwrap(); + row_count = count; + Arc::new(builder.finish()) + } + _ => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {})", + i + ))); + } } - Arc::new(builder.finish()) } - other => { - return Err(ExecutionError::NotImplemented(format!( - "unsupported column reader type ({:?})", - other - ))); + ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (FixedLenByteArray)" + .to_string(), + )); + } + ColumnReader::ByteArrayColumnReader(ref mut r) => { + let mut b: Vec = + Vec::with_capacity(self.batch_size); + for _ in 0..self.batch_size { + b.push(ByteArray::default()); + } + match r.read_batch(self.batch_size, None, None, &mut b) { + //TODO this isn't handling null values + Ok((count, _)) => { + row_count = count; + //TODO this is horribly inefficient + let mut builder = BinaryBuilder::new(row_count); + for j in 0..row_count { + let foo = b[j].slice(0, b[j].len()); + let bytes: &[u8] = foo.data(); + let str = + String::from_utf8(bytes.to_vec()).unwrap(); + builder.append_string(&str).unwrap(); + } + Arc::new(builder.finish()) + } + _ => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {})", + i + ))); + } + } } }; + + println!("Adding array to batch"); batch.push(array); } - println!("Loaded batch of {} rows", rows.len()); + println!("Loaded batch of {} rows", row_count); - if rows.len() == 0 { + if row_count == 0 { Ok(None) } else { - Ok(Some(RecordBatch::try_new( - self.schema.projection(&self.projection)?, - batch, - )?)) + Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?)) } } _ => Ok(None), @@ -342,6 +412,7 @@ mod tests { } fn load_table(name: &str) -> Box { + println!("load_table"); let testdata = env::var("PARQUET_TEST_DATA").unwrap(); let filename = format!("{}/{}", testdata, name); let table = ParquetTable::new(&filename); diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 5ffae5dafe814..1843974055998 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -26,21 +26,23 @@ extern crate datafusion; use arrow::array::*; use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::datasource::parquet::ParquetTable; +use datafusion::datasource::Table; use datafusion::execution::context::ExecutionContext; use datafusion::execution::relation::Relation; -use datafusion::datasource::parquet::ParquetFile; -use datafusion::datasource::parquet::ParquetTable; -use datafusion::datasource::{RecordBatchIterator, Table}; const DEFAULT_BATCH_SIZE: usize = 1024 * 1024; #[test] fn parquet_query() { let mut ctx = ExecutionContext::new(); - ctx.register_table("alltypes_plain", load_parquet_table("alltypes_plain.parquet")); + ctx.register_table( + "alltypes_plain", + load_parquet_table("alltypes_plain.parquet"), + ); let sql = "SELECT id, string_col FROM alltypes_plain"; let actual = execute(&mut ctx, sql); - let expected = "tbd".to_string(); + let expected = "4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"\n".to_string(); assert_eq!(expected, actual); } @@ -187,7 +189,9 @@ fn load_parquet_table(name: &str) -> Rc
{ /// Execute query and return result set as tab delimited string fn execute(ctx: &mut ExecutionContext, sql: &str) -> String { - let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap(); + let plan = ctx.create_logical_plan(&sql).unwrap(); + println!("Plan: {:?}", plan); + let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap(); result_str(&results) }