Skip to content

Commit

Permalink
Make remote cache writes be async (#11479)
Browse files Browse the repository at this point in the history
Closes #11434. This should avoid writing to the cache from noticeably slowing down Pants's performance.

This does add a new risk that some cache writes will not happen, particularly when Pantsd is not in use. To assess the problem, we add new metrics for when a cache write starts and finishes. If it becomes a serious problem, we can revisit this, including possibly requiring that Pantsd be used for remote cache writes.
  • Loading branch information
Eric-Arellano authored Jan 22, 2021
1 parent 4e23ed4 commit 3059645
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 47 deletions.
55 changes: 37 additions & 18 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_trait::async_trait;
use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
use bazel_protos::require_digest;
use fs::RelativePath;
use futures::FutureExt;
use hashing::Digest;
use remexec::action_cache_client::ActionCacheClient;
use remexec::{ActionResult, Command, FileNode, Tree};
Expand Down Expand Up @@ -35,6 +36,7 @@ use crate::{
pub struct CommandRunner {
underlying: Arc<dyn crate::CommandRunner>,
metadata: ProcessMetadata,
executor: task_executor::Executor,
store: Store,
action_cache_client: Arc<ActionCacheClient<Channel>>,
headers: BTreeMap<String, String>,
Expand All @@ -48,6 +50,7 @@ impl CommandRunner {
pub fn new(
underlying: Arc<dyn crate::CommandRunner>,
metadata: ProcessMetadata,
executor: task_executor::Executor,
store: Store,
action_cache_address: &str,
root_ca_certs: Option<Vec<u8>>,
Expand Down Expand Up @@ -119,6 +122,7 @@ impl CommandRunner {
Ok(CommandRunner {
underlying,
metadata,
executor,
store,
action_cache_client,
headers,
Expand Down Expand Up @@ -485,7 +489,8 @@ impl crate::CommandRunner for CommandRunner {
None
}
}
};
}
.boxed();

// We speculate between reading from the remote cache vs. running locally. If there was a
// cache hit, we return early because there will be no need to write to the cache. Otherwise,
Expand All @@ -512,23 +517,37 @@ impl crate::CommandRunner for CommandRunner {
};

if result.exit_code == 0 && self.cache_write {
let write_result = self
.update_action_cache(
&context,
&request,
&result,
&self.metadata,
&command,
action_digest,
command_digest,
)
.await;
if let Err(err) = write_result {
log::warn!("Failed to write to remote cache: {}", err);
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteErrors, 1);
};
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteStarted, 1);
let command_runner = self.clone();
let result = result.clone();
// NB: We use `TaskExecutor::spawn` instead of `tokio::spawn` to ensure logging still works.
let _write_join = self.executor.spawn(
async move {
let write_result = command_runner
.update_action_cache(
&context,
&request,
&result,
&command_runner.metadata,
&command,
action_digest,
command_digest,
)
.await;
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteFinished, 1);
if let Err(err) = write_result {
log::warn!("Failed to write to remote cache: {}", err);
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteErrors, 1);
};
}
.boxed(),
);
}

Ok(result)
Expand Down
89 changes: 63 additions & 26 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ struct StoreSetup {
pub store: Store,
pub store_dir: PathBuf,
pub cas: StubCAS,
pub executor: task_executor::Executor,
}

impl StoreSetup {
pub fn new() -> StoreSetup {
let runtime = task_executor::Executor::new();
let executor = task_executor::Executor::new();
let cas = StubCAS::builder().build();
let store_dir = TempDir::new().unwrap().path().join("store_dir");
let store = Store::with_remote(
runtime,
executor.clone(),
store_dir.clone(),
vec![cas.address()],
None,
Expand All @@ -101,6 +102,7 @@ impl StoreSetup {
store,
store_dir,
cas,
executor,
}
}
}
Expand All @@ -120,16 +122,18 @@ fn create_local_runner(

fn create_cached_runner(
local: Box<dyn CommandRunnerTrait>,
store: Store,
store_setup: &StoreSetup,
read_delay_ms: u64,
write_delay_ms: u64,
eager_fetch: bool,
) -> (Box<dyn CommandRunnerTrait>, StubActionCache) {
let action_cache = StubActionCache::new(read_delay_ms).unwrap();
let action_cache = StubActionCache::new_with_delays(read_delay_ms, write_delay_ms).unwrap();
let runner = Box::new(
crate::remote_cache::CommandRunner::new(
local.into(),
ProcessMetadata::default(),
store,
store_setup.executor.clone(),
store_setup.store.clone(),
&action_cache.address(),
None,
None,
Expand Down Expand Up @@ -180,9 +184,8 @@ fn insert_into_action_cache(
async fn cache_read_success() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
Expand All @@ -201,9 +204,8 @@ async fn cache_read_success() {
async fn cache_read_skipped_on_errors() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
Expand All @@ -227,9 +229,9 @@ async fn cache_read_eager_fetch() {

async fn run_process(eager_fetch: bool) -> (i32, usize) {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), 0, eager_fetch);
create_cached_runner(local_runner, &store_setup, 0, 0, eager_fetch);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(
Expand Down Expand Up @@ -266,12 +268,8 @@ async fn cache_read_speculation() {
async fn run_process(local_delay_ms: u64, remote_delay_ms: u64, cache_hit: bool) -> (i32, usize) {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms);
let (cache_runner, action_cache) = create_cached_runner(
local_runner,
store_setup.store.clone(),
remote_delay_ms,
false,
);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, remote_delay_ms, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
if cache_hit {
Expand Down Expand Up @@ -308,9 +306,8 @@ async fn cache_read_speculation() {
async fn cache_write_success() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(0, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);
let (local_runner, local_runner_call_counter) = create_local_runner(0, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let (process, action_digest) = create_process(&store_setup.store).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
Expand All @@ -323,6 +320,8 @@ async fn cache_write_success() {
assert_eq!(local_result.exit_code, 0);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1);

// Wait for the cache write block to finish.
delay_for(Duration::from_millis(100)).await;
assert_eq!(action_cache.action_map.lock().len(), 1);
let action_map_mutex_guard = action_cache.action_map.lock();
assert_eq!(
Expand All @@ -338,9 +337,8 @@ async fn cache_write_success() {
async fn cache_write_not_for_failures() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let (process, _action_digest) = create_process(&store_setup.store).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
Expand All @@ -353,7 +351,45 @@ async fn cache_write_not_for_failures() {
assert_eq!(local_result.exit_code, 1);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1);

// Wait for the cache write block to finish.
delay_for(Duration::from_millis(100)).await;
assert!(action_cache.action_map.lock().is_empty());
}

/// Cache writes should be async and not block the CommandRunner from returning.
#[tokio::test]
async fn cache_write_does_not_block() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(0, 100);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, 0, 100, false);
let (process, action_digest) = create_process(&store_setup.store).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());

let local_result = cache_runner
.run(process.clone().into(), Context::default())
.await
.unwrap();
assert_eq!(local_result.exit_code, 0);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1);

// We expect the cache write to have not finished yet, even though we already finished
// CommandRunner::run().
assert!(action_cache.action_map.lock().is_empty());

delay_for(Duration::from_millis(200)).await;
assert_eq!(action_cache.action_map.lock().len(), 1);
let action_map_mutex_guard = action_cache.action_map.lock();
assert_eq!(
action_map_mutex_guard
.get(&action_digest.0)
.unwrap()
.exit_code,
0
);
}

#[tokio::test]
Expand Down Expand Up @@ -503,10 +539,11 @@ async fn make_action_result_basic() {
.expect("Error saving directory");

let mock_command_runner = Arc::new(MockCommandRunner);
let action_cache = StubActionCache::new(0).unwrap();
let action_cache = StubActionCache::new().unwrap();
let runner = crate::remote_cache::CommandRunner::new(
mock_command_runner.clone(),
ProcessMetadata::default(),
executor.clone(),
store.clone(),
&action_cache.address(),
None,
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ async fn remote_workunits_are_stored() {
.file(&TestData::roland())
.directory(&TestDirectory::containing_roland())
.build();
let action_cache = mock::StubActionCache::new(0).unwrap();
let action_cache = mock::StubActionCache::new().unwrap();
let (command_runner, _store) =
create_command_runner(action_cache.address(), &cas, Platform::Linux);

Expand Down Expand Up @@ -2301,7 +2301,7 @@ async fn extract_execute_response(
operation: Operation,
remote_platform: Platform,
) -> Result<RemoteTestResult, ExecutionError> {
let action_cache = mock::StubActionCache::new(0).expect("failed to create action cache");
let action_cache = mock::StubActionCache::new().expect("failed to create action cache");

let cas = mock::StubCAS::builder()
.file(&TestData::roland())
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl Core {
Box::new(process_execution::remote_cache::CommandRunner::new(
local_command_runner.into(),
process_execution_metadata.clone(),
executor.clone(),
full_store.clone(),
action_cache_address.as_str(),
root_ca_certs.clone(),
Expand Down
10 changes: 9 additions & 1 deletion src/rust/engine/testutil/mock/src/action_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct ActionCacheResponder {
action_map: Arc<Mutex<HashMap<Fingerprint, ActionResult>>>,
always_errors: Arc<AtomicBool>,
read_delay: Duration,
write_delay: Duration,
}

#[tonic::async_trait]
Expand Down Expand Up @@ -107,6 +108,8 @@ impl ActionCache for ActionCacheResponder {
&self,
request: Request<UpdateActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
delay_for(self.write_delay).await;

let request = request.into_inner();

let action_digest: Digest = match require_digest(request.action_digest.as_ref()) {
Expand Down Expand Up @@ -135,13 +138,18 @@ impl ActionCache for ActionCacheResponder {
}

impl StubActionCache {
pub fn new(read_delay_ms: u64) -> Result<Self, String> {
pub fn new() -> Result<Self, String> {
Self::new_with_delays(0, 0)
}

pub fn new_with_delays(read_delay_ms: u64, write_delay_ms: u64) -> Result<Self, String> {
let action_map = Arc::new(Mutex::new(HashMap::new()));
let always_errors = Arc::new(AtomicBool::new(false));
let responder = ActionCacheResponder {
action_map: action_map.clone(),
always_errors: always_errors.clone(),
read_delay: Duration::from_millis(read_delay_ms),
write_delay: Duration::from_millis(write_delay_ms),
};

let addr = "127.0.0.1:0"
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub enum Metric {
RemoteCacheRequestsUncached,
RemoteCacheReadErrors,
RemoteCacheWriteErrors,
RemoteCacheWriteStarted,
RemoteCacheWriteFinished,
RemoteCacheSpeculationLocalCompletedFirst,
RemoteCacheSpeculationRemoteCompletedFirst,
RemoteExecutionErrors,
Expand Down Expand Up @@ -68,6 +70,8 @@ impl Metric {
RemoteCacheRequestsUncached => "remote_cache_requests_uncached",
RemoteCacheReadErrors => "remote_cache_read_errors",
RemoteCacheWriteErrors => "remote_cache_write_errors",
RemoteCacheWriteStarted => "remote_cache_write_started",
RemoteCacheWriteFinished => "remote_cache_write_finished",
RemoteCacheSpeculationLocalCompletedFirst => "remote_cache_speculation_local_completed_first",
RemoteCacheSpeculationRemoteCompletedFirst => {
"remote_cache_speculation_remote_completed_first"
Expand Down

0 comments on commit 3059645

Please sign in to comment.