From 1e4d67f147854df1c946c843b99ce4a7c46c72f0 Mon Sep 17 00:00:00 2001 From: Sicheng Pan Date: Tue, 24 Sep 2024 16:12:16 -0700 Subject: [PATCH] [ENH] Implement `MetadataProvider`, `RoaringMetadataFilter`, and refactor `MetadataFilteringOperator` --- .../execution/operators/metadata_filtering.rs | 1877 ++++++++++------- 1 file changed, 1164 insertions(+), 713 deletions(-) diff --git a/rust/worker/src/execution/operators/metadata_filtering.rs b/rust/worker/src/execution/operators/metadata_filtering.rs index ba24e26f977..083aaf78a9b 100644 --- a/rust/worker/src/execution/operators/metadata_filtering.rs +++ b/rust/worker/src/execution/operators/metadata_filtering.rs @@ -1,28 +1,47 @@ +use std::{ + collections::{BTreeMap, HashMap}, + ops::{BitAnd, BitOr, Bound}, +}; + use crate::{ execution::operator::Operator, segment::{ metadata_segment::{MetadataSegmentError, MetadataSegmentReader}, record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - LogMaterializer, LogMaterializerError, + LogMaterializer, LogMaterializerError, MaterializedLogRecord, }, }; -use chroma_blockstore::{key::KeyWrapper, provider::BlockfileProvider}; +use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; -use chroma_index::{ - fulltext::types::process_where_document_clause_with_callback, - metadata::types::{process_where_clause_with_callback, MetadataIndexError}, - utils::{merge_sorted_vecs_conjunction, merge_sorted_vecs_disjunction}, -}; +use chroma_index::metadata::types::MetadataIndexError; use chroma_types::{ - Chunk, LogRecord, MaterializedLogOperation, MetadataValue, Segment, Where, - WhereClauseComparator, WhereDocument, WhereDocumentOperator, + BooleanOperator, Chunk, DirectDocumentComparison, DirectWhereComparison, DocumentOperator, + LogRecord, MaterializedLogOperation, MetadataSetValue, MetadataValue, PrimitiveOperator, + Segment, SetOperator, SignedRoaringBitmap, Where, WhereChildren, WhereComparison, }; -use core::panic; use roaring::RoaringBitmap; -use std::collections::{HashMap, HashSet}; use thiserror::Error; use tonic::async_trait; -use tracing::{Instrument, Span}; +use tracing::{trace, Instrument, Span}; + +/// # Description +/// The `MetadataFilteringOperator` should produce the offset ids of the matching documents. +/// +/// # Input +/// - `blockfile_provider` / `record_segment` / `metadata_segment`: handles to the underlying data. +/// - `log_record`: the chunk of log that is not yet compacted, representing the latest updates. +/// - `query_ids`: user provided ids, which must be a superset of returned documents. +/// - `where_clause`: a boolean predicate on the metadata and the content of the document. +/// - `offset`: the number of records with smallest offset ids to skip, if specified +/// - `limit`: the number of records with smallest offset ids to take after the skip, if specified +/// +/// # Output +/// - `log_record`: the same `log_record` from the input. +/// - `offset_ids`: the matching offset ids (in both log and compact storage). +/// +/// # Note +/// - The `MetadataProvider` enum can be viewed as an universal interface for the metadata and document index. +/// - In the output, `log_mask` should be a subset of `offset_ids` #[derive(Debug)] pub(crate) struct MetadataFilteringOperator {} @@ -35,33 +54,36 @@ impl MetadataFilteringOperator { #[derive(Debug)] pub(crate) struct MetadataFilteringInput { - log_record: Chunk, + blockfile_provider: BlockfileProvider, record_segment: Segment, metadata_segment: Segment, - blockfile_provider: BlockfileProvider, - where_clause: Option, - where_document_clause: Option, + log_record: Chunk, query_ids: Option>, + where_clause: Option, + offset: Option, + limit: Option, } impl MetadataFilteringInput { pub(crate) fn new( - log_record: Chunk, + blockfile_provider: BlockfileProvider, record_segment: Segment, metadata_segment: Segment, - blockfile_provider: BlockfileProvider, - where_clause: Option, - where_document_clause: Option, + log_record: Chunk, query_ids: Option>, + where_clause: Option, + offset: Option, + limit: Option, ) -> Self { Self { - log_record, + blockfile_provider, record_segment, metadata_segment, - blockfile_provider, - where_clause, - where_document_clause, + log_record, query_ids, + where_clause, + offset, + limit, } } } @@ -69,31 +91,329 @@ impl MetadataFilteringInput { #[derive(Debug)] pub(crate) struct MetadataFilteringOutput { pub(crate) log_records: Chunk, - // Offset Ids of documents that match the where and where_document clauses. - pub(crate) where_condition_filtered_offset_ids: Option>, - // Offset ids of documents that the user specified in the query directly. - pub(crate) user_supplied_filtered_offset_ids: Option>, + pub(crate) offset_ids: RoaringBitmap, } #[derive(Error, Debug)] pub(crate) enum MetadataFilteringError { #[error("Error creating record segment reader {0}")] - RecordSegmentReaderCreation(#[from] RecordSegmentReaderCreationError), + RecordSegmentReaderCreationError(#[from] RecordSegmentReaderCreationError), #[error("Error materializing logs {0}")] - LogMaterialization(#[from] LogMaterializerError), + LogMaterializationError(#[from] LogMaterializerError), #[error("Error filtering documents by where or where_document clauses {0}")] - Index(#[from] MetadataIndexError), + IndexError(#[from] MetadataIndexError), #[error("Error from metadata segment reader {0}")] - MetadataSegmentReader(#[from] MetadataSegmentError), + MetadataSegmentReaderError(#[from] MetadataSegmentError), + #[error("Error reading from record segment")] + RecordSegmentReaderError, + #[error("Invalid input")] + InvalidInput, } impl ChromaError for MetadataFilteringError { fn code(&self) -> ErrorCodes { match self { - MetadataFilteringError::RecordSegmentReaderCreation(e) => e.code(), - MetadataFilteringError::LogMaterialization(e) => e.code(), - MetadataFilteringError::Index(e) => e.code(), - MetadataFilteringError::MetadataSegmentReader(e) => e.code(), + MetadataFilteringError::RecordSegmentReaderCreationError(e) => e.code(), + MetadataFilteringError::LogMaterializationError(e) => e.code(), + MetadataFilteringError::IndexError(e) => e.code(), + MetadataFilteringError::MetadataSegmentReaderError(e) => e.code(), + MetadataFilteringError::RecordSegmentReaderError => ErrorCodes::Internal, + MetadataFilteringError::InvalidInput => ErrorCodes::InvalidArgument, + } + } +} + +/// This sturct provides an abstraction over the materialized logs that is similar to the metadata segment +pub(crate) struct MetadataLogReader<'me> { + // This maps metadata keys to a `BTreeMap` mapping values to offset ids + // This mimics the layout in the metadata segment + // //TODO: Maybe a sorted vector with binary search is more lightweight and performant? + compact_metadata: HashMap<&'me str, BTreeMap<&'me MetadataValue, RoaringBitmap>>, + // This maps offset ids to documents, excluding deleted ones + document: HashMap, + // This contains all offset ids that is present in the materialized log + domain: RoaringBitmap, + // This maps user ids to offset ids, excluding deleted ones + // The value set should be a subset of `domain` + uid_to_oid: HashMap<&'me str, u32>, +} + +impl<'me> MetadataLogReader<'me> { + pub(crate) fn new(logs: &'me Chunk>) -> Self { + let mut compact_metadata: HashMap<_, BTreeMap<&MetadataValue, RoaringBitmap>> = + HashMap::new(); + let mut document = HashMap::new(); + let mut domain = RoaringBitmap::new(); + let mut uid_to_oid = HashMap::new(); + for (log, _) in logs.iter() { + domain.insert(log.offset_id); + if !matches!( + log.final_operation, + MaterializedLogOperation::DeleteExisting + ) { + uid_to_oid.insert(log.merged_user_id_ref(), log.offset_id); + let log_meta = log.merged_metadata_ref(); + for (key, val) in log_meta.into_iter() { + compact_metadata + .entry(key) + .or_default() + .entry(val) + .or_default() + .insert(log.offset_id); + } + if let Some(doc) = log.merged_document_ref() { + document.insert(log.offset_id, doc); + } + } + } + Self { + compact_metadata, + document, + domain, + uid_to_oid, + } + } + pub(crate) fn get( + &self, + key: &str, + val: &MetadataValue, + op: &PrimitiveOperator, + ) -> Result { + use Bound::*; + use PrimitiveOperator::*; + if let Some(btm) = self.compact_metadata.get(key) { + let bounds = match op { + Equal => (Included(&val), Included(&val)), + GreaterThan => (Excluded(&val), Unbounded), + GreaterThanOrEqual => (Included(&val), Unbounded), + LessThan => (Unbounded, Excluded(&val)), + LessThanOrEqual => (Unbounded, Included(&val)), + // Inequality filter is not supported at metadata provider level + NotEqual => return Err(MetadataFilteringError::InvalidInput), + }; + Ok(btm + .range::<&MetadataValue, _>(bounds) + .map(|(_, v)| v) + .fold(RoaringBitmap::new(), BitOr::bitor)) + } else { + Ok(RoaringBitmap::new()) + } + } + + pub(crate) fn search_user_ids(&self, uids: &Vec) -> RoaringBitmap { + uids.iter() + .filter_map(|uid| self.uid_to_oid.get(uid.as_str())) + .collect() + } + + pub(crate) fn active_domain(&'me self) -> RoaringBitmap { + self.uid_to_oid.values().collect() + } +} + +pub(crate) enum MetadataProvider<'me> { + Compact(&'me MetadataSegmentReader<'me>), + Log(&'me MetadataLogReader<'me>), +} + +impl<'me> MetadataProvider<'me> { + pub(crate) fn from_metadata_segment_reader(reader: &'me MetadataSegmentReader<'me>) -> Self { + Self::Compact(reader) + } + + pub(crate) fn from_metadata_log_reader(reader: &'me MetadataLogReader<'me>) -> Self { + Self::Log(reader) + } + + pub(crate) async fn filter_by_document( + &self, + query: &str, + ) -> Result { + use MetadataProvider::*; + match self { + Compact(metadata_segment_reader) => { + if let Some(reader) = metadata_segment_reader.full_text_index_reader.as_ref() { + Ok(reader + .search(query) + .await + .map_err(|e| MetadataIndexError::FullTextError(e))?) + } else { + Ok(RoaringBitmap::new()) + } + } + Log(metadata_log_reader) => Ok(metadata_log_reader + .document + .iter() + .filter_map(|(oid, doc)| doc.contains(query).then_some(oid)) + .collect()), + } + } + + pub(crate) async fn filter_by_metadata( + &self, + key: &str, + val: &MetadataValue, + op: &PrimitiveOperator, + ) -> Result { + use MetadataProvider::*; + use MetadataValue::*; + use PrimitiveOperator::*; + match self { + Compact(metadata_segment_reader) => { + let (metadata_index_reader, kw) = match val { + Bool(b) => ( + metadata_segment_reader.bool_metadata_index_reader.as_ref(), + &(*b).into(), + ), + Int(i) => ( + metadata_segment_reader.u32_metadata_index_reader.as_ref(), + &(*i as u32).into(), + ), + Float(f) => ( + metadata_segment_reader.f32_metadata_index_reader.as_ref(), + &(*f as f32).into(), + ), + Str(s) => ( + metadata_segment_reader + .string_metadata_index_reader + .as_ref(), + &s.as_str().into(), + ), + }; + if let Some(reader) = metadata_index_reader { + match op { + Equal => Ok(reader.get(key, kw).await?), + GreaterThan => Ok(reader.gt(key, kw).await?), + GreaterThanOrEqual => Ok(reader.gte(key, kw).await?), + LessThan => Ok(reader.lt(key, kw).await?), + LessThanOrEqual => Ok(reader.lte(key, kw).await?), + // Inequality filter is not supported at metadata provider level + NotEqual => Err(MetadataFilteringError::InvalidInput), + } + } else { + Ok(RoaringBitmap::new()) + } + } + Log(metadata_log_reader) => metadata_log_reader.get(key, val, op), + } + } +} + +pub(crate) trait RoaringMetadataFilter<'me> { + async fn eval( + &'me self, + meta_provider: &MetadataProvider<'me>, + ) -> Result; +} + +impl<'me> RoaringMetadataFilter<'me> for Where { + async fn eval( + &'me self, + meta_provider: &MetadataProvider<'me>, + ) -> Result { + use Where::*; + match self { + DirectWhereComparison(direct_comparison) => direct_comparison.eval(meta_provider).await, + DirectWhereDocumentComparison(direct_document_comparison) => { + direct_document_comparison.eval(meta_provider).await + } + WhereChildren(where_children) => Box::pin(where_children.eval(meta_provider)).await, + } + } +} + +impl<'me> RoaringMetadataFilter<'me> for DirectWhereComparison { + async fn eval( + &'me self, + meta_provider: &MetadataProvider<'me>, + ) -> Result { + use MetadataSetValue::*; + use PrimitiveOperator::*; + use SetOperator::*; + use SignedRoaringBitmap::*; + let result = match &self.comparison { + WhereComparison::Primitive(primitive_operator, metadata_value) => { + match primitive_operator { + // We convert the inequality check in to an equality check, and then negate the result + NotEqual => Exclude( + meta_provider + .filter_by_metadata(&self.key, metadata_value, &Equal) + .await?, + ), + Equal | GreaterThan | GreaterThanOrEqual | LessThan | LessThanOrEqual => { + Include( + meta_provider + .filter_by_metadata(&self.key, metadata_value, primitive_operator) + .await?, + ) + } + } + } + WhereComparison::Set(set_operator, metadata_set_value) => { + let child_values: Vec<_> = match metadata_set_value { + Bool(vec) => vec.iter().map(|b| MetadataValue::Bool(*b)).collect(), + Int(vec) => vec.iter().map(|i| MetadataValue::Int(*i)).collect(), + Float(vec) => vec.iter().map(|f| MetadataValue::Float(*f)).collect(), + Str(vec) => vec.iter().map(|s| MetadataValue::Str(s.clone())).collect(), + }; + let mut child_evals = Vec::with_capacity(child_values.len()); + for val in child_values { + let eval = meta_provider + .filter_by_metadata(&self.key, &val, &Equal) + .await?; + match set_operator { + In => child_evals.push(Include(eval)), + NotIn => child_evals.push(Exclude(eval)), + }; + } + match set_operator { + In => child_evals + .into_iter() + .fold(SignedRoaringBitmap::empty(), BitOr::bitor), + NotIn => child_evals + .into_iter() + .fold(SignedRoaringBitmap::full(), BitAnd::bitand), + } + } + }; + Ok(result) + } +} + +impl<'me> RoaringMetadataFilter<'me> for DirectDocumentComparison { + async fn eval( + &'me self, + meta_provider: &MetadataProvider<'me>, + ) -> Result { + use DocumentOperator::*; + use SignedRoaringBitmap::*; + let contain = meta_provider + .filter_by_document(self.document.as_str()) + .await?; + match self.operator { + Contains => Ok(Include(contain)), + NotContains => Ok(Exclude(contain)), + } + } +} + +impl<'me> RoaringMetadataFilter<'me> for WhereChildren { + async fn eval( + &'me self, + meta_provider: &MetadataProvider<'me>, + ) -> Result { + use BooleanOperator::*; + let mut child_evals = Vec::new(); + for child in &self.children { + child_evals.push(child.eval(meta_provider).await?); + } + match self.operator { + And => Ok(child_evals + .into_iter() + .fold(SignedRoaringBitmap::full(), BitAnd::bitand)), + Or => Ok(child_evals + .into_iter() + .fold(SignedRoaringBitmap::empty(), BitOr::bitor)), } } } @@ -110,597 +430,141 @@ impl Operator for MetadataFilte &self, input: &MetadataFilteringInput, ) -> Result { - // Step 0: Create the record segment reader. - let record_segment_reader: Option; - match RecordSegmentReader::from_segment(&input.record_segment, &input.blockfile_provider) - .await + use SignedRoaringBitmap::*; + trace!( + "[MetadataFilteringOperator] segment id: {}", + input.record_segment.id.to_string() + ); + + // Initialize record segment reader + let record_segment_reader = match RecordSegmentReader::from_segment( + &input.record_segment, + &input.blockfile_provider, + ) + .await { - Ok(reader) => { - record_segment_reader = Some(reader); + Ok(reader) => Ok(Some(reader)), + // Uninitialized segment is fine and means that the record + // segment is not yet initialized in storage + Err(e) if matches!(*e, RecordSegmentReaderCreationError::UninitializedSegment) => { + Ok(None) } Err(e) => { - match *e { - // Uninitialized segment is fine and means that the record - // segment is not yet initialized in storage. - RecordSegmentReaderCreationError::UninitializedSegment => { - record_segment_reader = None; - } - RecordSegmentReaderCreationError::BlockfileOpenError(e) => { - tracing::error!("Error creating record segment reader {}", e); - return Err(MetadataFilteringError::RecordSegmentReaderCreation( - RecordSegmentReaderCreationError::BlockfileOpenError(e), - )); - } - RecordSegmentReaderCreationError::InvalidNumberOfFiles => { - tracing::error!("Error creating record segment reader {}", e); - return Err(MetadataFilteringError::RecordSegmentReaderCreation( - RecordSegmentReaderCreationError::InvalidNumberOfFiles, - )); - } - }; + tracing::error!("Error creating record segment reader {}", e); + Err(MetadataFilteringError::RecordSegmentReaderCreationError(*e)) } - }; - // Step 1: Materialize the logs. - let materializer = - LogMaterializer::new(record_segment_reader, input.log_record.clone(), None); - let mat_records = match materializer + }?; + + // Materialize the logs + let materializer = LogMaterializer::new( + record_segment_reader.clone(), + input.log_record.clone(), + None, + ); + let materialized_logs = materializer .materialize() .instrument(tracing::trace_span!(parent: Span::current(), "Materialize logs")) .await - { - Ok(records) => records, - Err(e) => { - return Err(MetadataFilteringError::LogMaterialization(e)); - } - }; - // Step 2: Apply where and where_document clauses on the materialized logs. - let mut ids_to_metadata: HashMap> = HashMap::new(); - let mut ids_in_mat_log = HashSet::new(); - for (records, _) in mat_records.iter() { - // It's important to account for even the deleted records here - // so that they can be ignored when reading from the segment later. - ids_in_mat_log.insert(records.offset_id); - // Skip deleted records. - if records.final_operation == MaterializedLogOperation::DeleteExisting { - continue; - } - ids_to_metadata.insert(records.offset_id, records.merged_metadata_ref()); - } - let clo = |metadata_key: &str, - metadata_value: &chroma_blockstore::key::KeyWrapper, - metadata_type: chroma_types::MetadataType, - comparator: WhereClauseComparator| { - match metadata_type { - chroma_types::MetadataType::StringType => match comparator { - WhereClauseComparator::Equal => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key equal to this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Str(string_value), - KeyWrapper::String(where_value), - ) = (*val, metadata_value) - { - if *string_value == *where_value { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::NotEqual => { - todo!(); - } - // We don't allow these comparators for strings. - WhereClauseComparator::LessThan => { - unimplemented!(); - } - WhereClauseComparator::LessThanOrEqual => { - unimplemented!(); - } - WhereClauseComparator::GreaterThan => { - unimplemented!(); - } - WhereClauseComparator::GreaterThanOrEqual => { - unimplemented!(); - } - }, - chroma_types::MetadataType::BoolType => match comparator { - WhereClauseComparator::Equal => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key equal to this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Bool(bool_value), - KeyWrapper::Bool(where_value), - ) = (*val, metadata_value) - { - if *bool_value == *where_value { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::NotEqual => { - todo!(); - } - // We don't allow these comparators for booleans. - WhereClauseComparator::LessThan => { - unimplemented!(); - } - WhereClauseComparator::LessThanOrEqual => { - unimplemented!(); - } - WhereClauseComparator::GreaterThan => { - unimplemented!(); - } - WhereClauseComparator::GreaterThanOrEqual => { - unimplemented!(); - } - }, - chroma_types::MetadataType::IntType => match comparator { - WhereClauseComparator::Equal => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key equal to this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Int(int_value), - KeyWrapper::Uint32(where_value), - ) = (*val, metadata_value) - { - if *int_value as u32 == *where_value { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::NotEqual => { - todo!(); - } - WhereClauseComparator::LessThan => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key less than this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Int(int_value), - KeyWrapper::Uint32(where_value), - ) = (*val, metadata_value) - { - if ((*int_value) as u32) < (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::LessThanOrEqual => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key <= this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Int(int_value), - KeyWrapper::Uint32(where_value), - ) = (*val, metadata_value) - { - if ((*int_value) as u32) <= (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::GreaterThan => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key > this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Int(int_value), - KeyWrapper::Uint32(where_value), - ) = (*val, metadata_value) - { - if ((*int_value) as u32) > (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::GreaterThanOrEqual => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key >= this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Int(int_value), - KeyWrapper::Uint32(where_value), - ) = (*val, metadata_value) - { - if ((*int_value) as u32) >= (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - }, - chroma_types::MetadataType::DoubleType => match comparator { - WhereClauseComparator::Equal => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key equal to this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Float(float_value), - KeyWrapper::Float32(where_value), - ) = (*val, metadata_value) - { - if ((*float_value) as f32) == (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::NotEqual => { - todo!(); - } - WhereClauseComparator::LessThan => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key < this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Float(float_value), - KeyWrapper::Float32(where_value), - ) = (*val, metadata_value) - { - if ((*float_value) as f32) < (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::LessThanOrEqual => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key <= this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Float(float_value), - KeyWrapper::Float32(where_value), - ) = (*val, metadata_value) - { - if ((*float_value) as f32) <= (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::GreaterThan => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key > this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Float(float_value), - KeyWrapper::Float32(where_value), - ) = (*val, metadata_value) - { - if ((*float_value) as f32) > (*where_value) { - result.insert(*offset_id); - } - } - } - } - result - } - WhereClauseComparator::GreaterThanOrEqual => { - let mut result = RoaringBitmap::new(); - // Construct a bitmap consisting of all offset ids - // that have this key >= this value. - for (offset_id, meta_map) in &ids_to_metadata { - if let Some(val) = meta_map.get(metadata_key) { - if let ( - MetadataValue::Float(float_value), - KeyWrapper::Float32(where_value), - ) = (*val, metadata_value) - { - if ((*float_value) as f32) >= (*where_value) { - result.insert(*offset_id); - } - } - } - } - result + .map_err(|e| { + tracing::error!("Error materializing log: {}", e); + MetadataFilteringError::LogMaterializationError(e) + })?; + let metadata_log_reader = MetadataLogReader::new(&materialized_logs); + let log_metadata_provider = + MetadataProvider::from_metadata_log_reader(&metadata_log_reader); + + // Initialize metadata segment reader + let metadata_segement_reader = + MetadataSegmentReader::from_segment(&input.metadata_segment, &input.blockfile_provider) + .await + .map_err(|e| MetadataFilteringError::MetadataSegmentReaderError(e))?; + let compact_metadata_provider = + MetadataProvider::from_metadata_segment_reader(&metadata_segement_reader); + + // Get offset ids corresponding to user ids + let (user_log_oids, user_compact_oids) = if let Some(uids) = input.query_ids.as_ref() { + let log_oids = Include(metadata_log_reader.search_user_ids(uids)); + let compact_oids = if let Some(reader) = record_segment_reader.as_ref() { + let mut compact_oids = RoaringBitmap::new(); + for uid in uids { + if let Ok(oid) = reader.get_offset_id_for_user_id(uid.as_str()).await { + compact_oids.insert(oid); } - }, - chroma_types::MetadataType::StringListType => { - todo!(); - } - chroma_types::MetadataType::IntListType => { - todo!(); - } - chroma_types::MetadataType::DoubleListType => { - todo!(); } - chroma_types::MetadataType::BoolListType => { - todo!(); - } - } + Include(compact_oids) + } else { + SignedRoaringBitmap::full() + }; + (log_oids, compact_oids) + } else { + (SignedRoaringBitmap::full(), SignedRoaringBitmap::full()) }; - // This will be sorted by offset ids since rbms.insert() insert in sorted order. - let mtsearch_res = match &input.where_clause { - Some(where_clause) => match process_where_clause_with_callback(where_clause, &clo) { - Ok(r) => { - let ids_as_u32: Vec = r.into_iter().map(|index| index as u32).collect(); - tracing::info!( - "Filtered {} results from log based on where clause filtering", - ids_as_u32.len() - ); - Some(ids_as_u32) - } - Err(e) => { - tracing::error!("Error filtering logs based on where clause {:?}", e); - return Err(MetadataFilteringError::Index(e)); - } - }, - None => { - tracing::info!("Where clause not supplied by the user"); - None - } + + // Filter the offset ids in the log if the where clause is provided + let filterd_log_oids = if let Some(clause) = input.where_clause.as_ref() { + clause.eval(&log_metadata_provider).await? & user_log_oids + } else { + user_log_oids }; - // AND this with where_document clause. - let cb = |query: &str, op: WhereDocumentOperator| { - match op { - WhereDocumentOperator::Contains => { - // Note matching_contains is sorted (which is needed for correctness) - // because materialized log record is sorted by offset id. - let mut matching_contains = vec![]; - // Upstream sorts materialized records by offset id so matching_contains - // will be sorted. - // Note: Uncomment this out when adding FTS support for queries - // containing _ or %. Currently, we disable such scenarios in tests - // for distributed version. - // Emulate sqlite behavior. _ and % match to any character in sqlite. - // let normalized_query = query.replace("_", ".").replace("%", "."); - // let re = Regex::new(normalized_query.as_str()).unwrap(); - for (record, _) in mat_records.iter() { - if record.final_operation == MaterializedLogOperation::DeleteExisting { - continue; - } - if let Some(doc) = record.merged_document_ref() { - /* if re.is_match(doc) { */ - if doc.contains(query) { - matching_contains.push(record.offset_id as i32); - } - }; - } - matching_contains - } - WhereDocumentOperator::NotContains => { - todo!() - } - } + + // Materialize the offset ids to include from the log in the final result + let materialized_log_oids = match filterd_log_oids { + Include(rbm) => rbm, + Exclude(rbm) => metadata_log_reader.active_domain() - rbm, }; - // fts_result will be sorted by offset id. - let fts_result = match &input.where_document_clause { - Some(where_doc_clause) => { - match process_where_document_clause_with_callback(where_doc_clause, &cb) { - Ok(res) => { - let ids_as_u32: Vec = - res.into_iter().map(|index| index as u32).collect(); - tracing::info!( - "Filtered {} results from log based on where document filtering", - ids_as_u32.len() - ); - Some(ids_as_u32) - } - Err(e) => { - tracing::error!("Error filtering logs based on where document {:?}", e); - return Err(MetadataFilteringError::Index(e)); - } + + // Filter the offset ids in the metadata segment if the where clause is provided + // This always exclude all offsets that is present in the materialized log + let filtered_compact_oids = if let Some(clause) = input.where_clause.as_ref() { + clause.eval(&compact_metadata_provider).await? + & user_compact_oids + & Exclude(metadata_log_reader.domain) + } else { + user_compact_oids & Exclude(metadata_log_reader.domain) + }; + + // Materialize the offset ids to include from the metadata segment in the final result + // This should only contain offset ids not present in the materialized log + let materialized_compact_oids = match filtered_compact_oids { + Include(rbm) => rbm, + Exclude(rbm) => { + if let Some(reader) = record_segment_reader.as_ref() { + // TODO: Optimize offset limit performance + reader + .get_all_offset_ids() + .await + .map_err(|_| MetadataFilteringError::RecordSegmentReaderError)? + - rbm + } else { + RoaringBitmap::new() } } - None => { - tracing::info!("Where document not supplied by the user"); - None - } }; - let mut merged_result: Option> = None; - if mtsearch_res.is_none() && fts_result.is_some() { - merged_result = fts_result; - } else if mtsearch_res.is_some() && fts_result.is_none() { - merged_result = mtsearch_res; - } else if mtsearch_res.is_some() && fts_result.is_some() { - merged_result = Some(merge_sorted_vecs_conjunction( - &mtsearch_res.expect("Already validated that it is not none"), - &fts_result.expect("Already validated that it is not none"), - )); - } - - let mut filtered_index_offset_ids: Option> = None; - if input.where_clause.is_some() || input.where_document_clause.is_some() { - // Get offset ids that satisfy where conditions from storage. - let metadata_segment_reader = MetadataSegmentReader::from_segment( - &input.metadata_segment, - &input.blockfile_provider, - ) - .await - .map_err(MetadataFilteringError::MetadataSegmentReader)?; - - filtered_index_offset_ids = metadata_segment_reader - .query( - input.where_clause.as_ref(), - input.where_document_clause.as_ref(), - Some(&vec![]), - 0, - 0, - ) - .await - .map_err(MetadataFilteringError::MetadataSegmentReader)?; - } - // This will be sorted by offset id. - let mut filter_from_mt_segment: Option> = None; - if let Some(filtered_index_offset_ids) = filtered_index_offset_ids { - // convert to u32 and also filter out the ones present in the - // materialized log. This is strictly needed for correctness as - // the ids that satisfy the predicate in the metadata segment - // could have been updated more recently (in the log) to NOT - // satisfy the predicate, hence we treat the materialized log - // as the source of truth for ids that are present in both the - // places. - filter_from_mt_segment = Some( - filtered_index_offset_ids - .into_iter() - .map(|index| index as u32) - .filter(|x| !ids_in_mat_log.contains(x)) - .collect(), - ); + // Merge the materialized offset ids from the log and from the metadata segment + // The two roaring bitmaps involved here should be disjoint + let mut merged_oids = materialized_compact_oids | materialized_log_oids; + if let Some(skip) = input.offset.as_ref() { + merged_oids.remove_smallest(*skip as u64); } - // It cannot happen that one is none and other is some. - if (filter_from_mt_segment.is_some() && merged_result.is_none()) - || (filter_from_mt_segment.is_none() && merged_result.is_some()) - { - panic!("Invariant violation. Both should either be none or some"); - } - let mut where_condition_filtered_offset_ids = None; - if filter_from_mt_segment.is_some() && merged_result.is_some() { - where_condition_filtered_offset_ids = Some(merge_sorted_vecs_disjunction( - &filter_from_mt_segment.expect("Already checked that should be some"), - &merged_result.expect("Already checked that should be some"), - )); + if let Some(take) = input.limit.as_ref() { + merged_oids = merged_oids.into_iter().take(*take as usize).collect(); } - // Hydrate offset ids for user supplied ids. - // First from the log. - let mut user_supplied_offset_ids: Vec = vec![]; - let mut remaining_id_set: HashSet; - let query_ids_present; - match &input.query_ids { - Some(query_ids) => { - let query_ids_set: HashSet = HashSet::from_iter(query_ids.iter().cloned()); - query_ids_present = true; - remaining_id_set = query_ids.iter().cloned().collect(); - for (log_records, _) in mat_records.iter() { - let user_id = log_records.merged_user_id_ref(); - if query_ids_set.contains(user_id) { - remaining_id_set.remove(user_id); - if log_records.final_operation != MaterializedLogOperation::DeleteExisting { - user_supplied_offset_ids.push(log_records.offset_id); - } - } - } - tracing::info!( - "For user supplied query ids, filtered {} records from log, {} ids remain", - user_supplied_offset_ids.len(), - remaining_id_set.len() - ); - let record_segment_reader_2: Option; - match RecordSegmentReader::from_segment( - &input.record_segment, - &input.blockfile_provider, - ) - .await - { - Ok(reader) => { - record_segment_reader_2 = Some(reader); - } - Err(e) => { - match *e { - // Uninitialized segment is fine and means that the record - // segment is not yet initialized in storage. - RecordSegmentReaderCreationError::UninitializedSegment => { - record_segment_reader_2 = None; - } - RecordSegmentReaderCreationError::BlockfileOpenError(e) => { - tracing::error!("Error creating record segment reader {}", e); - return Err(MetadataFilteringError::RecordSegmentReaderCreation( - RecordSegmentReaderCreationError::BlockfileOpenError(e), - )); - } - RecordSegmentReaderCreationError::InvalidNumberOfFiles => { - tracing::error!("Error creating record segment reader {}", e); - return Err(MetadataFilteringError::RecordSegmentReaderCreation( - RecordSegmentReaderCreationError::InvalidNumberOfFiles, - )); - } - }; - } - }; - match &record_segment_reader_2 { - Some(r) => { - // Now read the remaining ids from storage. - for ids in remaining_id_set { - if let Ok(offset_id) = r.get_offset_id_for_user_id(ids.as_str()).await { - user_supplied_offset_ids.push(offset_id); - } - } - } - // It's ok for the user to supply a non existent id. - None => (), - } - } - None => { - query_ids_present = false; - } - } - // need to sort user_supplied_offset_ids by offset id. - user_supplied_offset_ids.sort(); - let mut filtered_offset_ids = None; - if query_ids_present { - tracing::info!( - "Filtered {} records (log + segment) based on user supplied ids", - user_supplied_offset_ids.len() - ); - filtered_offset_ids = Some(user_supplied_offset_ids); - } - return Ok(MetadataFilteringOutput { + Ok(MetadataFilteringOutput { log_records: input.log_record.clone(), - where_condition_filtered_offset_ids, - user_supplied_filtered_offset_ids: filtered_offset_ids, - }); + offset_ids: merged_oids, + }) } } #[cfg(test)] mod test { + use crate::execution::operator::Operator; use crate::{ - execution::{ - operator::Operator, - operators::metadata_filtering::{MetadataFilteringInput, MetadataFilteringOperator}, + execution::operators::metadata_filtering::{ + MetadataFilteringInput, MetadataFilteringOperator, }, segment::{ metadata_segment::MetadataSegmentWriter, @@ -718,9 +582,11 @@ mod test { use chroma_cache::{cache::Cache, config::CacheConfig, config::UnboundedCacheConfig}; use chroma_storage::{local::LocalStorage, Storage}; use chroma_types::{ - Chunk, DirectComparison, DirectDocumentComparison, LogRecord, Operation, OperationRecord, - UpdateMetadataValue, Where, WhereComparison, WhereDocument, + BooleanOperator, Chunk, DirectDocumentComparison, DirectWhereComparison, DocumentOperator, + LogRecord, MetadataSetValue, MetadataValue, Operation, OperationRecord, PrimitiveOperator, + SetOperator, UpdateMetadataValue, Where, WhereChildren, WhereComparison, }; + use roaring::RoaringBitmap; use std::{collections::HashMap, str::FromStr}; use uuid::Uuid; @@ -799,24 +665,27 @@ mod test { }, ]; let data: Chunk = Chunk::new(data.into()); - let record_segment_reader: Option = - match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await - { - Ok(reader) => Some(reader), - Err(e) => { - match *e { - // Uninitialized segment is fine and means that the record - // segment is not yet initialized in storage. - RecordSegmentReaderCreationError::UninitializedSegment => None, - RecordSegmentReaderCreationError::BlockfileOpenError(_) => { - panic!("Error creating record segment reader"); - } - RecordSegmentReaderCreationError::InvalidNumberOfFiles => { - panic!("Error creating record segment reader"); - } + let record_segment_reader: Option; + match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await { + Ok(reader) => { + record_segment_reader = Some(reader); + } + Err(e) => { + match *e { + // Uninitialized segment is fine and means that the record + // segment is not yet initialized in storage. + RecordSegmentReaderCreationError::UninitializedSegment => { + record_segment_reader = None; } - } - }; + RecordSegmentReaderCreationError::BlockfileOpenError(_) => { + panic!("Error creating record segment reader"); + } + RecordSegmentReaderCreationError::InvalidNumberOfFiles => { + panic!("Error creating record segment reader"); + } + }; + } + }; let materializer = LogMaterializer::new(record_segment_reader, data, None); let mat_records = materializer .materialize() @@ -895,47 +764,38 @@ mod test { ]; let data: Chunk = Chunk::new(data.into()); let operator = MetadataFilteringOperator::new(); - let where_clause: Where = Where::DirectWhereComparison(DirectComparison { + let where_clause: Where = Where::DirectWhereComparison(DirectWhereComparison { key: String::from("hello"), - comparison: WhereComparison::SingleStringComparison( - String::from("new_world"), - chroma_types::WhereClauseComparator::Equal, + comparison: WhereComparison::Primitive( + PrimitiveOperator::Equal, + MetadataValue::Str(String::from("new_world")), ), }); let where_document_clause = - WhereDocument::DirectWhereDocumentComparison(DirectDocumentComparison { + Where::DirectWhereDocumentComparison(DirectDocumentComparison { document: String::from("about dogs"), - operator: chroma_types::WhereDocumentOperator::Contains, + operator: chroma_types::DocumentOperator::Contains, }); let input = MetadataFilteringInput::new( - data.clone(), + blockfile_provider.clone(), record_segment.clone(), metadata_segment.clone(), - blockfile_provider.clone(), - Some(where_clause), - Some(where_document_clause), + data.clone(), + None, + Some(Where::conjunction(vec![ + where_clause, + where_document_clause, + ])), + None, None, ); let res = operator .run(&input) .await .expect("Error during running of operator"); - assert_eq!(None, res.user_supplied_filtered_offset_ids); - assert_eq!( - 1, - res.where_condition_filtered_offset_ids - .clone() - .expect("Expected one document") - .len() - ); - assert_eq!( - 3, - *res.where_condition_filtered_offset_ids - .expect("Expected one document") - .first() - .expect("Expect not none") - ); + assert_eq!(1, res.offset_ids.len()); + assert_eq!(3, res.offset_ids.select(0).expect("Expect not none")); } #[tokio::test] @@ -1013,24 +873,27 @@ mod test { }, ]; let data: Chunk = Chunk::new(data.into()); - let record_segment_reader: Option = - match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await - { - Ok(reader) => Some(reader), - Err(e) => { - match *e { - // Uninitialized segment is fine and means that the record - // segment is not yet initialized in storage. - RecordSegmentReaderCreationError::UninitializedSegment => None, - RecordSegmentReaderCreationError::BlockfileOpenError(_) => { - panic!("Error creating record segment reader"); - } - RecordSegmentReaderCreationError::InvalidNumberOfFiles => { - panic!("Error creating record segment reader"); - } + let record_segment_reader: Option; + match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await { + Ok(reader) => { + record_segment_reader = Some(reader); + } + Err(e) => { + match *e { + // Uninitialized segment is fine and means that the record + // segment is not yet initialized in storage. + RecordSegmentReaderCreationError::UninitializedSegment => { + record_segment_reader = None; } - } - }; + RecordSegmentReaderCreationError::BlockfileOpenError(_) => { + panic!("Error creating record segment reader"); + } + RecordSegmentReaderCreationError::InvalidNumberOfFiles => { + panic!("Error creating record segment reader"); + } + }; + } + }; let materializer = LogMaterializer::new(record_segment_reader, data, None); let mat_records = materializer .materialize() @@ -1085,18 +948,19 @@ mod test { }]; let data: Chunk = Chunk::new(data.into()); let operator = MetadataFilteringOperator::new(); - let where_clause: Where = Where::DirectWhereComparison(DirectComparison { + let where_clause: Where = Where::DirectWhereComparison(DirectWhereComparison { key: String::from("bye"), - comparison: WhereComparison::SingleStringComparison( - String::from("world"), - chroma_types::WhereClauseComparator::Equal, + comparison: WhereComparison::Primitive( + PrimitiveOperator::Equal, + MetadataValue::Str(String::from("world")), ), }); let input = MetadataFilteringInput::new( - data.clone(), + blockfile_provider.clone(), record_segment.clone(), metadata_segment.clone(), - blockfile_provider.clone(), + data.clone(), + None, Some(where_clause), None, None, @@ -1105,22 +969,17 @@ mod test { .run(&input) .await .expect("Error during running of operator"); - assert_eq!(None, res.user_supplied_filtered_offset_ids); + assert_eq!(2, res.offset_ids.len()); + // Already sorted. + assert_eq!( + 1, + res.offset_ids.select(0).expect("Expected not none value") + ); assert_eq!( 2, - res.where_condition_filtered_offset_ids - .clone() - .expect("Expected one document") - .len() + res.offset_ids.select(1).expect("Expected not none value") ); - let where_res = res - .where_condition_filtered_offset_ids - .expect("Expect not none") - .clone(); - // Already sorted. - assert_eq!(1, *where_res.first().expect("Expected not none value")); - assert_eq!(2, *where_res.get(1).expect("Expected not none value")); } #[tokio::test] @@ -1198,24 +1057,27 @@ mod test { }, ]; let data: Chunk = Chunk::new(data.into()); - let record_segment_reader: Option = - match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await - { - Ok(reader) => Some(reader), - Err(e) => { - match *e { - // Uninitialized segment is fine and means that the record - // segment is not yet initialized in storage. - RecordSegmentReaderCreationError::UninitializedSegment => None, - RecordSegmentReaderCreationError::BlockfileOpenError(_) => { - panic!("Error creating record segment reader"); - } - RecordSegmentReaderCreationError::InvalidNumberOfFiles => { - panic!("Error creating record segment reader"); - } + let record_segment_reader: Option; + match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await { + Ok(reader) => { + record_segment_reader = Some(reader); + } + Err(e) => { + match *e { + // Uninitialized segment is fine and means that the record + // segment is not yet initialized in storage. + RecordSegmentReaderCreationError::UninitializedSegment => { + record_segment_reader = None; } - } - }; + RecordSegmentReaderCreationError::BlockfileOpenError(_) => { + panic!("Error creating record segment reader"); + } + RecordSegmentReaderCreationError::InvalidNumberOfFiles => { + panic!("Error creating record segment reader"); + } + }; + } + }; let materializer = LogMaterializer::new(record_segment_reader, data, None); let mat_records = materializer .materialize() @@ -1291,35 +1153,624 @@ mod test { let data: Chunk = Chunk::new(data.into()); let operator = MetadataFilteringOperator::new(); let input = MetadataFilteringInput::new( - data.clone(), + blockfile_provider.clone(), record_segment.clone(), metadata_segment.clone(), - blockfile_provider.clone(), - None, - None, + data.clone(), Some(vec![ String::from("embedding_id_1"), String::from("embedding_id_3"), ]), + None, + None, + None, ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(2, res.offset_ids.len()); + assert_eq!(1, res.offset_ids.select(0).expect("Expect not none value")); + assert_eq!(3, res.offset_ids.select(1).expect("Expect not none value")); + } + + #[tokio::test] + async fn test_composite_filter() { + let tmp_dir = tempfile::tempdir().unwrap(); + let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); + let block_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let sparse_index_cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {})); + let arrow_blockfile_provider = ArrowBlockfileProvider::new( + storage, + TEST_MAX_BLOCK_SIZE_BYTES, + block_cache, + sparse_index_cache, + ); + let blockfile_provider = + BlockfileProvider::ArrowBlockfileProvider(arrow_blockfile_provider); + let mut record_segment = chroma_types::Segment { + id: Uuid::from_str("00000000-0000-0000-0000-000000000000").expect("parse error"), + r#type: chroma_types::SegmentType::BlockfileRecord, + scope: chroma_types::SegmentScope::RECORD, + collection: Uuid::from_str("00000000-0000-0000-0000-000000000000") + .expect("parse error"), + metadata: None, + file_path: HashMap::new(), + }; + let mut metadata_segment = chroma_types::Segment { + id: Uuid::from_str("00000000-0000-0000-0000-000000000001").expect("parse error"), + r#type: chroma_types::SegmentType::BlockfileMetadata, + scope: chroma_types::SegmentScope::METADATA, + collection: Uuid::from_str("00000000-0000-0000-0000-000000000000") + .expect("parse error"), + metadata: None, + file_path: HashMap::new(), + }; + { + let segment_writer = + RecordSegmentWriter::from_segment(&record_segment, &blockfile_provider) + .await + .expect("Error creating segment writer"); + let mut metadata_writer = + MetadataSegmentWriter::from_segment(&metadata_segment, &blockfile_provider) + .await + .expect("Error creating segment writer"); + let mut logs = Vec::new(); + for i in 1..=60 { + let mut meta = HashMap::new(); + if i % 2 == 0 { + meta.insert("even".to_string(), UpdateMetadataValue::Bool(i % 4 == 0)); + } + meta.insert( + format!("mod_three_{}", i % 3), + UpdateMetadataValue::Float(i as f64), + ); + meta.insert("mod_five".to_string(), UpdateMetadataValue::Int(i % 5)); + let emb = (0..3).map(|o| (3 * i + o) as f32).collect(); + logs.push(LogRecord { + log_offset: i, + record: OperationRecord { + id: format!("id_{}", i), + embedding: Some(emb), + encoding: None, + metadata: Some(meta), + document: Some(format!("-->{}<--", i)), + operation: Operation::Add, + }, + }); + } + let data: Chunk = Chunk::new(logs.into()); + let record_segment_reader: Option; + match RecordSegmentReader::from_segment(&record_segment, &blockfile_provider).await { + Ok(reader) => { + record_segment_reader = Some(reader); + } + Err(e) => { + match *e { + // Uninitialized segment is fine and means that the record + // segment is not yet initialized in storage. + RecordSegmentReaderCreationError::UninitializedSegment => { + record_segment_reader = None; + } + RecordSegmentReaderCreationError::BlockfileOpenError(_) => { + panic!("Error creating record segment reader"); + } + RecordSegmentReaderCreationError::InvalidNumberOfFiles => { + panic!("Error creating record segment reader"); + } + }; + } + }; + let materializer = LogMaterializer::new(record_segment_reader, data, None); + let mat_records = materializer + .materialize() + .await + .expect("Log materialization failed"); + metadata_writer + .apply_materialized_log_chunk(mat_records.clone()) + .await + .expect("Apply materialized log to metadata segment failed"); + metadata_writer + .write_to_blockfiles() + .await + .expect("Write to blockfiles for metadata writer failed"); + segment_writer + .apply_materialized_log_chunk(mat_records) + .await + .expect("Apply materialized log to record segment failed"); + let record_flusher = segment_writer + .commit() + .expect("Commit for segment writer failed"); + let metadata_flusher = metadata_writer + .commit() + .expect("Commit for metadata writer failed"); + record_segment.file_path = record_flusher + .flush() + .await + .expect("Flush record segment writer failed"); + metadata_segment.file_path = metadata_flusher + .flush() + .await + .expect("Flush metadata segment writer failed"); + } + let mut logs = Vec::new(); + for i in 61..=120 { + let mut meta = HashMap::new(); + if i % 2 == 0 { + meta.insert("even".to_string(), UpdateMetadataValue::Bool(i % 4 == 0)); + } + meta.insert( + format!("mod_three_{}", i % 3), + UpdateMetadataValue::Float(i as f64), + ); + meta.insert("mod_five".to_string(), UpdateMetadataValue::Int(i % 5)); + let emb = (0..3).map(|o| (3 * i + o) as f32).collect(); + logs.push(LogRecord { + log_offset: i, + record: OperationRecord { + id: format!("id_{}", i), + embedding: Some(emb), + encoding: None, + metadata: Some(meta), + document: Some(format!("-->{}<--", i)), + operation: Operation::Add, + }, + }); + } + for i in 1..=20 { + logs.push(LogRecord { + log_offset: 120 + i, + record: OperationRecord { + id: format!("id_{}", i * 6), + embedding: None, + encoding: None, + metadata: None, + document: None, + operation: Operation::Delete, + }, + }); + } + let data: Chunk = Chunk::new(logs.into()); + let operator = MetadataFilteringOperator::new(); + + // Test set summary: + // Total records count: 120, with id 1-120 + // Records 1-60 are compacted + // Records 61-120 are in the log + // Records with id % 6 == 1 are deleted + // Record metadata has the following keys + // - even: only exists for even ids, value is a boolean matching id % 4 == 0 + // - mod_three_{id % 3}: a floating point value converted from id + // - mod_five: an integer value matching id % 5 + // Record document has format "-->{id}<--" + + let existing = (1..=120).filter(|i| i % 6 != 0); + + // A full scan should yield all existing records that are not yet deleted + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + None, + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(res.offset_ids, existing.clone().collect()); + + // A full scan within the user specified ids should yield matching records + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + Some((31..=90).map(|i| format!("id_{}", i)).collect()), + None, + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing.clone().filter(|i| &31 <= i && i <= &90).collect() + ); + + // A $eq check on metadata should yield matching records + let where_clause = Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Primitive(PrimitiveOperator::Equal, MetadataValue::Int(2)), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing.clone().filter(|i| i % 5 == 2).collect() + ); + + // A $ne check on metadata should yield matching records + let where_clause = Where::DirectWhereComparison(DirectWhereComparison { + key: "even".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::NotEqual, + MetadataValue::Bool(false), + ), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing + .clone() + .filter(|i| i % 2 == 1 || i % 4 == 0) + .collect() + ); + + // A $lte check on metadata should yield matching records + let where_clause = Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_three_2".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::LessThanOrEqual, + MetadataValue::Float(50.0), + ), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing + .clone() + .filter(|i| i % 3 == 2 && i <= &50) + .collect() + ); + + // A $contains check on document should yield matching records + let where_doc_clause = Where::DirectWhereDocumentComparison(DirectDocumentComparison { + operator: DocumentOperator::Contains, + document: String::from("6<-"), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_doc_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing.clone().filter(|i| i % 10 == 6).collect() + ); + + // A $not_contains check on document should yield matching records + let where_doc_clause = Where::DirectWhereDocumentComparison(DirectDocumentComparison { + operator: DocumentOperator::NotContains, + document: String::from("3<-"), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_doc_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing.clone().filter(|i| i % 10 != 3).collect() + ); + + // A $in check on metadata should yield matching records + let where_clause = Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Set(SetOperator::In, MetadataSetValue::Int(vec![1, 3])), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing + .clone() + .filter(|i| i % 5 == 1 || i % 5 == 3) + .collect() + ); + + // A $in should behave like a disjunction of $eq + let contain_res = res.offset_ids; + let where_clause = Where::WhereChildren(WhereChildren { + operator: BooleanOperator::Or, + children: vec![ + Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::Equal, + MetadataValue::Int(1), + ), + }), + Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::Equal, + MetadataValue::Int(3), + ), + }), + ], + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(res.offset_ids, contain_res); + + // A $nin check on metadata should yield matching records + let where_clause = Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Set(SetOperator::NotIn, MetadataSetValue::Int(vec![1, 3])), + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!( + res.offset_ids, + existing + .clone() + .filter(|i| i % 5 != 1 && i % 5 != 3) + .collect() + ); + + // A $nin should behave like a conjunction of $neq + let contain_res = res.offset_ids; + let where_clause = Where::WhereChildren(WhereChildren { + operator: BooleanOperator::And, + children: vec![ + Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::NotEqual, + MetadataValue::Int(1), + ), + }), + Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::NotEqual, + MetadataValue::Int(3), + ), + }), + ], + }); + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + Some(where_clause), + None, + None, + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(res.offset_ids, contain_res); + + // offset and limit should yield the correct chunk of records + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + None, + Some(36), + Some(54), + ); + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(res.offset_ids, existing.clone().skip(36).take(54).collect()); + + // A large offset should yield no record + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + None, + Some(200), + None, + ); + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(res.offset_ids, RoaringBitmap::new()); + + // A large limit should yield all records + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + None, + None, + None, + Some(200), + ); + + let res = operator + .run(&input) + .await + .expect("Error during running of operator"); + assert_eq!(res.offset_ids, existing.clone().collect()); + + // Finally, test a composite filter with limit and offset + let where_clause = Where::WhereChildren(WhereChildren { + operator: BooleanOperator::And, + children: vec![ + Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_three_0".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::GreaterThanOrEqual, + MetadataValue::Float(12.0), + ), + }), + Where::DirectWhereComparison(DirectWhereComparison { + key: "mod_five".to_string(), + comparison: WhereComparison::Set( + SetOperator::NotIn, + MetadataSetValue::Int(vec![0, 3]), + ), + }), + Where::WhereChildren(WhereChildren { + operator: BooleanOperator::Or, + children: vec![ + Where::DirectWhereDocumentComparison(DirectDocumentComparison { + operator: DocumentOperator::NotContains, + document: "6<-".to_string(), + }), + Where::DirectWhereComparison(DirectWhereComparison { + key: "even".to_string(), + comparison: WhereComparison::Primitive( + PrimitiveOperator::Equal, + MetadataValue::Bool(true), + ), + }), + ], + }), + ], + }); + + let input = MetadataFilteringInput::new( + blockfile_provider.clone(), + record_segment.clone(), + metadata_segment.clone(), + data.clone(), + Some((0..90).map(|i| format!("id_{}", i)).collect()), + Some(where_clause), + Some(2), + Some(7), + ); + let res = operator .run(&input) .await .expect("Error during running of operator"); - assert_eq!(None, res.where_condition_filtered_offset_ids); - let query_offset_id_vec = res - .user_supplied_filtered_offset_ids - .expect("Expected not none") - .clone(); - // Already sorted. - assert_eq!(2, query_offset_id_vec.len()); - assert_eq!( - 1, - *query_offset_id_vec.first().expect("Expect not none value") - ); assert_eq!( - 3, - *query_offset_id_vec.get(1).expect("Expect not none value") + res.offset_ids, + existing + .filter(|i| i % 3 == 0 + && i >= &12 + && i <= &90 + && i % 5 != 0 + && i % 5 != 3 + && (i % 10 != 6 || i % 4 == 0)) + .skip(2) + .take(7) + .collect() ); } }