From 5223f7131dad37cf86771e2079bc04f7a6efb8b4 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 1 Jul 2024 13:40:43 -0700 Subject: [PATCH] .stop() does not need to be async --- rust/worker/src/execution/orchestration/compact.rs | 2 +- .../src/execution/orchestration/get_vectors.rs | 2 +- rust/worker/src/execution/orchestration/hnsw.rs | 2 +- rust/worker/src/execution/orchestration/metadata.rs | 4 ++-- rust/worker/src/lib.rs | 10 +++++----- rust/worker/src/system/types.rs | 13 +++++++------ 6 files changed, 17 insertions(+), 16 deletions(-) diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index 78e700ea71b..11b14318845 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -478,7 +478,7 @@ impl CompactOrchestrator { self.result_channel = Some(tx); let mut handle = self.system.clone().start_component(self); let result = rx.await; - handle.stop().await; + handle.stop(); result.unwrap() } } diff --git a/rust/worker/src/execution/orchestration/get_vectors.rs b/rust/worker/src/execution/orchestration/get_vectors.rs index d76a6f09063..1e881a29ae9 100644 --- a/rust/worker/src/execution/orchestration/get_vectors.rs +++ b/rust/worker/src/execution/orchestration/get_vectors.rs @@ -202,7 +202,7 @@ impl GetVectorsOrchestrator { self.result_channel = Some(tx); let mut handle = self.system.clone().start_component(self); let result = rx.await; - handle.stop().await; + handle.stop(); result.unwrap() } } diff --git a/rust/worker/src/execution/orchestration/hnsw.rs b/rust/worker/src/execution/orchestration/hnsw.rs index 5ab6bf0c126..9f6a1451547 100644 --- a/rust/worker/src/execution/orchestration/hnsw.rs +++ b/rust/worker/src/execution/orchestration/hnsw.rs @@ -485,7 +485,7 @@ impl HnswQueryOrchestrator { self.result_channel = Some(tx); let mut handle = self.system.clone().start_component(self); let result = rx.await; - handle.stop().await; + handle.stop(); result.unwrap() } } diff --git a/rust/worker/src/execution/orchestration/metadata.rs b/rust/worker/src/execution/orchestration/metadata.rs index f6e1c5d7d03..5449e9b668b 100644 --- a/rust/worker/src/execution/orchestration/metadata.rs +++ b/rust/worker/src/execution/orchestration/metadata.rs @@ -361,7 +361,7 @@ impl CountQueryOrchestrator { self.result_channel = Some(tx); let mut handle = self.system.clone().start_component(self); let result = rx.await; - handle.stop().await; + handle.stop(); result.unwrap() } } @@ -719,7 +719,7 @@ impl MetadataQueryOrchestrator { self.result_channel = Some(tx); let mut handle = self.system.clone().start_component(self); let result = rx.await; - handle.stop().await; + handle.stop(); result.unwrap() } } diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index dbfe931d8cb..074dee97aa7 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -80,7 +80,7 @@ pub async fn query_service_entrypoint() { // Kubernetes will send SIGTERM to stop the pod gracefully // TODO: add more signal handling _ = sigterm.recv() => { - dispatcher_handle.stop().await; + dispatcher_handle.stop(); dispatcher_handle.join().await; system.stop().await; system.join().await; @@ -138,7 +138,7 @@ pub async fn compaction_service_entrypoint() { compaction_manager.set_system(system.clone()); let mut compaction_manager_handle = system.start_component(compaction_manager); - memberlist.subscribe(compaction_manager_handle.as_receiver()); + memberlist.subscribe(compaction_manager_handle.receiver()); let mut memberlist_handle = system.start_component(memberlist); @@ -154,11 +154,11 @@ pub async fn compaction_service_entrypoint() { // Kubernetes will send SIGTERM to stop the pod gracefully // TODO: add more signal handling _ = sigterm.recv() => { - memberlist_handle.stop().await; + memberlist_handle.stop(); memberlist_handle.join().await; - dispatcher_handle.stop().await; + dispatcher_handle.stop(); dispatcher_handle.join().await; - compaction_manager_handle.stop().await; + compaction_manager_handle.stop(); compaction_manager_handle.join().await; system.stop().await; system.join().await; diff --git a/rust/worker/src/system/types.rs b/rust/worker/src/system/types.rs index 4313d707a19..a369d9a0164 100644 --- a/rust/worker/src/system/types.rs +++ b/rust/worker/src/system/types.rs @@ -2,8 +2,9 @@ use super::{scheduler::Scheduler, wrap, ChannelError, WrappedMessage}; use async_trait::async_trait; use core::panic; use futures::Stream; +use parking_lot::Mutex; use std::{fmt::Debug, sync::Arc}; -use tokio::{sync::Mutex, task::JoinError}; +use tokio::task::JoinError; use super::{system::System, ReceiverForMessage}; @@ -185,8 +186,8 @@ impl ComponentHandle { } } - pub(crate) async fn stop(&mut self) { - let mut state = self.state.lock().await; + pub(crate) fn stop(&mut self) { + let mut state = self.state.lock(); self.cancellation_token.cancel(); *state = ComponentState::Stopped; } @@ -201,10 +202,10 @@ impl ComponentHandle { } pub(crate) async fn state(&self) -> ComponentState { - return self.state.lock().await.clone(); + return self.state.lock().clone(); } - pub(crate) fn as_receiver(&self) -> Box> + pub(crate) fn receiver(&self) -> Box> where C: Component + Handler, M: Debug + Send + 'static, @@ -318,7 +319,7 @@ mod tests { tokio::task::yield_now().await; // With the streaming data and the messages we should have 12 assert_eq!(counter.load(Ordering::SeqCst), 12); - handle.stop().await; + handle.stop(); // Yield to allow the component to stop tokio::task::yield_now().await; // Expect the component to be stopped