Skip to content

Commit

Permalink
feat: apply retry category when recovery (#1768)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Apr 11, 2022
1 parent 2f04635 commit 0d867ec
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 72 deletions.
2 changes: 0 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ impl<S> GlobalBarrierManager<S>
where
S: MetaStore,
{
const RECOVERY_RETRY_INTERVAL: Duration = Duration::from_millis(500);

/// Create a new [`crate::barrier::GlobalBarrierManager`].
pub fn new(
env: MetaSrvEnv<S>,
Expand Down
151 changes: 81 additions & 70 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::collections::HashSet;
use std::iter::Map;
use std::time::Duration;

use futures::future::try_join_all;
use log::{debug, error};
Expand All @@ -24,6 +26,7 @@ use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, SyncSourcesRequest,
UpdateActorsRequest,
};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use uuid::Uuid;

use crate::barrier::command::CommandContext;
Expand All @@ -39,13 +42,25 @@ impl<S> GlobalBarrierManager<S>
where
S: MetaStore,
{
// Retry base interval in milliseconds.
const RECOVERY_RETRY_BASE_INTERVAL: u64 = 100;
// Retry max interval.
const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(10);

#[inline(always)]
/// Initialize a retry strategy for operation in recovery.
fn get_retry_strategy() -> Map<ExponentialBackoff, fn(Duration) -> Duration> {
ExponentialBackoff::from_millis(Self::RECOVERY_RETRY_BASE_INTERVAL)
.max_delay(Self::RECOVERY_RETRY_MAX_INTERVAL)
.map(jitter)
}

/// Recovery the whole cluster from the latest epoch.
pub(crate) async fn recovery(
&self,
prev_epoch: u64,
prev_command: Option<Command>,
) -> RecoveryResult {
let mut new_epoch = self.env.epoch_generator().generate();
// Abort buffered schedules, they might be dirty already.
self.scheduled_barriers.abort().await;

Expand All @@ -55,35 +70,29 @@ where
}

debug!("recovery start!");
loop {
tokio::time::sleep(Self::RECOVERY_RETRY_INTERVAL).await;

let retry_category = Self::get_retry_strategy();
let (new_epoch, responses) = tokio_retry::Retry::spawn(retry_category, || async {
let info = self.resolve_actor_info(None).await;
let mut new_epoch = self.env.epoch_generator().generate();

// Reset all compute nodes, stop and drop existing actors.
if self
.reset_compute_nodes(&info, prev_epoch, new_epoch.into_inner())
.await
.is_err()
{
error!("reset_and_wait_compute_nodes failed");
continue;
}
self.reset_compute_nodes(&info, prev_epoch, new_epoch.into_inner())
.await;

// Refresh sources in local source manger of compute node.
if let Err(err) = self.sync_sources(&info).await {
error!("sync_sources failed: {}", err);
continue;
return Err(err);
}

// update and build all actors.
if let Err(err) = self.update_actors(&info).await {
error!("update_actors failed: {}", err);
continue;
return Err(err);
}
if let Err(err) = self.build_actors(&info).await {
error!("build_actors failed: {}", err);
continue;
return Err(err);
}

let prev_epoch = new_epoch.into_inner();
Expand All @@ -98,23 +107,32 @@ where
Command::checkpoint(),
);

let responses = self.inject_barrier(&command_ctx).await;

if responses.is_err() || command_ctx.post_collect().await.is_err() {
continue;
match self.inject_barrier(&command_ctx).await {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
error!("post_collect failed: {}", err);
return Err(err);
}
Ok((new_epoch, response))
}
Err(err) => {
error!("inject_barrier failed: {}", err);
Err(err)
}
}
})
.await
.expect("Retry until recovery success.");
debug!("recovery success");

debug!("recovery success");
return (
new_epoch,
self.fragment_manager.all_chain_actor_ids().await,
responses
.unwrap()
.into_iter()
.flat_map(|r| r.finished_create_mviews)
.collect(),
);
}
return (
new_epoch,
self.fragment_manager.all_chain_actor_ids().await,
responses
.into_iter()
.flat_map(|r| r.finished_create_mviews)
.collect(),
);
}

/// Clean up previous command dirty data. Currently, we only need to handle table fragments info
Expand All @@ -123,14 +141,12 @@ where
/// it.
async fn clean_up(&self, prev_command: Command) {
if let Some(table_id) = prev_command.creating_table_id() {
while self
.fragment_manager
.drop_table_fragments(&table_id)
.await
.is_err()
{
tokio::time::sleep(Self::RECOVERY_RETRY_INTERVAL).await;
}
let retry_category = Self::get_retry_strategy();
tokio_retry::Retry::spawn(retry_category, || async {
self.fragment_manager.drop_table_fragments(&table_id).await
})
.await
.expect("Retry clean up until success");
}
}

Expand Down Expand Up @@ -233,39 +249,34 @@ where
}

/// Reset all compute nodes by calling `force_stop_actors`.
async fn reset_compute_nodes(
&self,
info: &BarrierActorInfo,
prev_epoch: u64,
new_epoch: u64,
) -> Result<()> {
for worker_node in info.node_map.values() {
loop {
// force shutdown actors on running compute nodes
match self.env.stream_clients().get(worker_node).await {
Ok(client) => {
if client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
epoch: Some(ProstEpoch {
curr: new_epoch,
prev: prev_epoch,
}),
})
.await
.is_ok()
{
break;
}
}
Err(err) => {
debug!("failed to get client: {}", err);
}
}
async fn reset_compute_nodes(&self, info: &BarrierActorInfo, prev_epoch: u64, new_epoch: u64) {
let futures = info.node_map.iter().map(|(_, worker_node)| {
let retry_strategy = Self::get_retry_strategy();

async move {
tokio_retry::Retry::spawn(retry_strategy, || async {
let client = self.env.stream_clients().get(worker_node).await?;
debug!("force stop actors: {}", worker_node.id);
client
.to_owned()
.force_stop_actors(ForceStopActorsRequest {
request_id: Uuid::new_v4().to_string(),
epoch: Some(ProstEpoch {
curr: new_epoch,
prev: prev_epoch,
}),
})
.await
.to_rw_result()
})
.await
.expect("Force stop actors until success");

Ok::<_, RwError>(())
}
}
});

try_join_all(futures).await.unwrap();
debug!("all compute nodes have been reset.");
Ok(())
}
}

0 comments on commit 0d867ec

Please sign in to comment.