Skip to content

Commit

Permalink
.stop() does not need to be async
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jul 1, 2024
1 parent f2c1305 commit 5223f71
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 16 deletions.
2 changes: 1 addition & 1 deletion rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl CompactOrchestrator {
self.result_channel = Some(tx);
let mut handle = self.system.clone().start_component(self);
let result = rx.await;
handle.stop().await;
handle.stop();
result.unwrap()
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/orchestration/get_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl GetVectorsOrchestrator {
self.result_channel = Some(tx);
let mut handle = self.system.clone().start_component(self);
let result = rx.await;
handle.stop().await;
handle.stop();
result.unwrap()
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/orchestration/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ impl HnswQueryOrchestrator {
self.result_channel = Some(tx);
let mut handle = self.system.clone().start_component(self);
let result = rx.await;
handle.stop().await;
handle.stop();
result.unwrap()
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/execution/orchestration/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl CountQueryOrchestrator {
self.result_channel = Some(tx);
let mut handle = self.system.clone().start_component(self);
let result = rx.await;
handle.stop().await;
handle.stop();
result.unwrap()
}
}
Expand Down Expand Up @@ -719,7 +719,7 @@ impl MetadataQueryOrchestrator {
self.result_channel = Some(tx);
let mut handle = self.system.clone().start_component(self);
let result = rx.await;
handle.stop().await;
handle.stop();
result.unwrap()
}
}
Expand Down
10 changes: 5 additions & 5 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn query_service_entrypoint() {
// Kubernetes will send SIGTERM to stop the pod gracefully
// TODO: add more signal handling
_ = sigterm.recv() => {
dispatcher_handle.stop().await;
dispatcher_handle.stop();
dispatcher_handle.join().await;
system.stop().await;
system.join().await;
Expand Down Expand Up @@ -138,7 +138,7 @@ pub async fn compaction_service_entrypoint() {
compaction_manager.set_system(system.clone());

let mut compaction_manager_handle = system.start_component(compaction_manager);
memberlist.subscribe(compaction_manager_handle.as_receiver());
memberlist.subscribe(compaction_manager_handle.receiver());

let mut memberlist_handle = system.start_component(memberlist);

Expand All @@ -154,11 +154,11 @@ pub async fn compaction_service_entrypoint() {
// Kubernetes will send SIGTERM to stop the pod gracefully
// TODO: add more signal handling
_ = sigterm.recv() => {
memberlist_handle.stop().await;
memberlist_handle.stop();
memberlist_handle.join().await;
dispatcher_handle.stop().await;
dispatcher_handle.stop();
dispatcher_handle.join().await;
compaction_manager_handle.stop().await;
compaction_manager_handle.stop();
compaction_manager_handle.join().await;
system.stop().await;
system.join().await;
Expand Down
13 changes: 7 additions & 6 deletions rust/worker/src/system/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use super::{scheduler::Scheduler, wrap, ChannelError, WrappedMessage};
use async_trait::async_trait;
use core::panic;
use futures::Stream;
use parking_lot::Mutex;
use std::{fmt::Debug, sync::Arc};
use tokio::{sync::Mutex, task::JoinError};
use tokio::task::JoinError;

use super::{system::System, ReceiverForMessage};

Expand Down Expand Up @@ -185,8 +186,8 @@ impl<C: Component> ComponentHandle<C> {
}
}

pub(crate) async fn stop(&mut self) {
let mut state = self.state.lock().await;
pub(crate) fn stop(&mut self) {
let mut state = self.state.lock();
self.cancellation_token.cancel();
*state = ComponentState::Stopped;
}
Expand All @@ -201,10 +202,10 @@ impl<C: Component> ComponentHandle<C> {
}

pub(crate) async fn state(&self) -> ComponentState {
return self.state.lock().await.clone();
return self.state.lock().clone();
}

pub(crate) fn as_receiver<M>(&self) -> Box<dyn ReceiverForMessage<M>>
pub(crate) fn receiver<M>(&self) -> Box<dyn ReceiverForMessage<M>>
where
C: Component + Handler<M>,
M: Debug + Send + 'static,
Expand Down Expand Up @@ -318,7 +319,7 @@ mod tests {
tokio::task::yield_now().await;
// With the streaming data and the messages we should have 12
assert_eq!(counter.load(Ordering::SeqCst), 12);
handle.stop().await;
handle.stop();
// Yield to allow the component to stop
tokio::task::yield_now().await;
// Expect the component to be stopped
Expand Down

0 comments on commit 5223f71

Please sign in to comment.