Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Jul 12, 2024
1 parent 74fc086 commit 8e97d51
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 33 deletions.
4 changes: 2 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::path::PathBuf;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -222,12 +221,12 @@ impl Configurable<CompactionServiceConfig> for CompactionManager {
assignment_policy,
);

// TODO: hnsw index provider should be injected somehow
let blockfile_provider = BlockfileProvider::try_from_config(&(
config.blockfile_provider.clone(),
storage.clone(),
))
.await?;

let hnsw_index_provider =
HnswIndexProvider::try_from_config(&(config.hnsw_provider.clone(), storage.clone()))
.await?;
Expand Down Expand Up @@ -317,6 +316,7 @@ mod tests {
use crate::types::OperationRecord;
use crate::types::Segment;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use uuid::Uuid;

Expand Down
29 changes: 2 additions & 27 deletions rust/worker/src/index/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl HnswIndexProvider {
return Err(Box::new(HnswIndexProviderCreateError::HnswConfigError(*e)));
}
};

// HnswIndex init is not thread safe. We should not call it from multiple threads
let index = match HnswIndex::init(&index_config, Some(&hnsw_config), id) {
Ok(index) => index,
Err(e) => {
Expand All @@ -325,14 +325,7 @@ impl HnswIndexProvider {
Ok(index)
}

pub(crate) fn commit(&self, id: &Uuid) -> Result<(), Box<HnswIndexProviderCommitError>> {
let index = match self.cache.get(id) {
Some(index) => index,
None => {
return Err(Box::new(HnswIndexProviderCommitError::NoIndexFound(*id)));
}
};

pub(crate) fn commit(&self, index: Arc<RwLock<HnswIndex>>) -> Result<(), Box<dyn ChromaError>> {
match index.write().save() {
Ok(_) => {}
Err(e) => {
Expand All @@ -344,24 +337,6 @@ impl HnswIndexProvider {
}

pub(crate) async fn flush(&self, id: &Uuid) -> Result<(), Box<HnswIndexProviderFlushError>> {
// Scope to drop the cache lock before we await to write to s3
// TODO: since we commit(), we don't need to save the index here
{
// let cache = self.cache.read();
let index = match self.cache.get(id) {
Some(index) => index,
None => {
return Err(Box::new(HnswIndexProviderFlushError::NoIndexFound(*id)));
}
};
match index.write().save() {
Ok(_) => {}
Err(e) => {
return Err(Box::new(HnswIndexProviderFlushError::HnswSaveError(e)));
}
};
}

let index_storage_path = self.temporary_storage_path.join(id.to_string());
for file in FILES.iter() {
let file_path = index_storage_path.join(file);
Expand Down
7 changes: 3 additions & 4 deletions rust/worker/src/segment/distributed_hnsw_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::index::hnsw_provider::{
HnswIndexProviderFlushError, HnswIndexProviderForkError, HnswIndexProviderOpenError,
};
use crate::index::{
self, HnswIndex, HnswIndexConfig, HnswIndexFromSegmentError, Index, IndexConfig,
HnswIndex, HnswIndexConfig, HnswIndexFromSegmentError, Index, IndexConfig,
IndexConfigFromSegmentError,
};
use crate::types::{LogRecord, Operation, Segment};
use crate::types::{Operation, Segment};
use async_trait::async_trait;
use parking_lot::RwLock;
use std::collections::HashMap;
Expand Down Expand Up @@ -247,8 +247,7 @@ impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter {
}

fn commit(self) -> Result<impl SegmentFlusher, Box<dyn ChromaError>> {
let hnsw_index_id = self.index.read().id;
let res = self.hnsw_index_provider.commit(&hnsw_index_id);
let res = self.hnsw_index_provider.commit(self.index.clone());
match res {
Ok(_) => Ok(self),
Err(e) => Err(e),
Expand Down

0 comments on commit 8e97d51

Please sign in to comment.