Skip to content

Commit

Permalink
Tuning and combined improvements for perf and observability
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 14, 2024
1 parent a7e81b0 commit cfe427d
Show file tree
Hide file tree
Showing 18 changed files with 533 additions and 239 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ codegen-units = 1
# Let's be defensive and abort on every panic
panic = "abort"

[profile.release-debug]
inherits = "release"
debug = true

[profile.dev]
# Let's be defensive and abort on every panic
panic = "abort"
Expand Down
8 changes: 3 additions & 5 deletions crates/bifrost/benches/append_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::StreamExt;
use restate_bifrost::{Bifrost, BifrostService};
use restate_core::metadata;
use restate_rocksdb::{DbName, Owner, RocksDbManager};
use restate_rocksdb::{DbName, RocksDbManager};
use restate_types::config::{
BifrostOptionsBuilder, CommonOptionsBuilder, ConfigurationBuilder, LocalLogletOptionsBuilder,
};
Expand Down Expand Up @@ -174,14 +174,12 @@ fn write_throughput_local_loglet(c: &mut Criterion) {
group.finish();
let db_manager = RocksDbManager::get();

let db = db_manager
.get_db(Owner::Bifrost, DbName::new("local-loglet"))
.unwrap();
let db = db_manager.get_db(DbName::new("local-loglet")).unwrap();
let stats = db.get_statistics_str();
let total_wb_usage = db_manager.get_total_write_buffer_usage();
let wb_capacity = db_manager.get_total_write_buffer_capacity();
let memory = db_manager
.get_memory_usage_stats(&[(Owner::Bifrost, DbName::new("local-loglet"))])
.get_memory_usage_stats(&[DbName::new("local-loglet")])
.unwrap();
test_runner_rt.block_on(tc.shutdown_node("completed", 0));
test_runner_rt.block_on(RocksDbManager::get().shutdown());
Expand Down
32 changes: 12 additions & 20 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::sync::Arc;

use restate_rocksdb::{
CfExactPattern, CfName, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError,
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
Expand Down Expand Up @@ -58,23 +58,18 @@ impl RocksDbLogStore {

let data_dir = options.data_dir();

let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::Bifrost,
data_dir,
db_options(options),
)
.add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options)
.add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options)
// not very important but it's to reduce the number of merges by flushing.
// it's also a small cf so it should be quick.
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
.ensure_column_families(cfs)
.build_as_db();
let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), data_dir, db_options(options))
.add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options)
.add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options)
// not very important but it's to reduce the number of merges by flushing.
// it's also a small cf so it should be quick.
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
.ensure_column_families(cfs)
.build_as_db();
let db_name = db_spec.name().clone();
// todo: use the returned rocksdb object when open_db returns Arc<RocksDb>
let _ = db_manager.open_db(updateable_options, db_spec)?;
let rocksdb = db_manager.get_db(Owner::Bifrost, db_name).unwrap();
let rocksdb = db_manager.get_db(db_name).unwrap();
Ok(Self { rocksdb })
}

Expand Down Expand Up @@ -138,6 +133,7 @@ fn cf_data_options(mut opts: rocksdb::Options) -> rocksdb::Options {
//
// Set compactions per level
//
opts.set_max_write_buffer_number(2);
opts.set_num_levels(7);
opts.set_compression_per_level(&[
DBCompressionType::None,
Expand Down Expand Up @@ -165,11 +161,7 @@ fn cf_metadata_options(mut opts: rocksdb::Options) -> rocksdb::Options {
DBCompressionType::None,
DBCompressionType::Zstd,
]);
//
// Most of the changes are highly temporal, we try to delay flushing
// to merge metadata updates into fewer L0 files.
opts.set_max_write_buffer_number(3);
opts.set_min_write_buffer_number_to_merge(3);
opts.set_max_write_buffer_number(2);
opts.set_max_successive_merges(10);
// Merge operator for log state updates
opts.set_merge_operator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl LogStoreWriter {
if self.manual_wal_flush {
// WAL flush is done in the foreground, but sync will happen in the background to avoid
// blocking IO.
if let Err(e) = self.rocksdb.inner().flush_wal(opts.sync_wal_before_ack) {
if let Err(e) = self.rocksdb.flush_wal(opts.sync_wal_before_ack).await {
warn!("Failed to flush rocksdb WAL in local loglet : {}", e);
self.send_acks(Err(Error::LogStoreError(e.into())));
return;
Expand Down
42 changes: 28 additions & 14 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ use codederror::CodedError;
use restate_core::cancellation_watcher;
use restate_core::metadata_store::{Precondition, VersionedValue};
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDbManager, RocksError,
CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager,
RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::RocksDbOptions;
use restate_types::storage::{
StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError,
};
use restate_types::Version;
use rocksdb::{BoundColumnFamily, Options, WriteOptions, DB};
use rocksdb::{BoundColumnFamily, Options, WriteBatch, WriteOptions, DB};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -63,6 +64,8 @@ pub enum MetadataStoreRequest {
pub enum Error {
#[error("storage error: {0}")]
Storage(#[from] rocksdb::Error),
#[error("rocksdb error: {0}")]
RocksDb(#[from] RocksError),
#[error("failed precondition: {0}")]
FailedPrecondition(String),
#[error("invalid argument: {0}")]
Expand Down Expand Up @@ -99,6 +102,7 @@ pub enum BuildError {
/// store in a single thread.
pub struct LocalMetadataStore {
db: Arc<DB>,
rocksdb: Arc<RocksDb>,
opts: Box<dyn Updateable<RocksDbOptions> + Send + 'static>,
request_rx: RequestReceiver,
buffer: BytesMut,
Expand All @@ -119,11 +123,11 @@ impl LocalMetadataStore {
{
let (request_tx, request_rx) = mpsc::channel(request_queue_length);

let db_name = DbName::new(DB_NAME);
let db_manager = RocksDbManager::get();
let cfs = vec![CfName::new(KV_PAIRS)];
let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::MetadataStore,
db_name.clone(),
data_dir.as_ref().to_path_buf(),
Options::default(),
)
Expand All @@ -132,9 +136,13 @@ impl LocalMetadataStore {
.build_as_db();

let db = db_manager.open_db(rocksdb_options(), db_spec)?;
let rocksdb = db_manager
.get_db(db_name)
.expect("metadata store db is open");

Ok(Self {
db,
rocksdb,
opts: Box::new(rocksdb_options()),
buffer: BytesMut::default(),
request_rx,
Expand Down Expand Up @@ -204,7 +212,7 @@ impl LocalMetadataStore {
precondition,
result_tx,
} => {
let result = self.put(&key, &value, precondition);
let result = self.put(&key, &value, precondition).await;
Self::log_error(&result, "Put");
let _ = result_tx.send(result);
}
Expand Down Expand Up @@ -244,43 +252,49 @@ impl LocalMetadataStore {
}
}

fn put(
async fn put(
&mut self,
key: &ByteString,
value: &VersionedValue,
precondition: Precondition,
) -> Result<()> {
match precondition {
Precondition::None => Ok(self.write_versioned_kv_pair(key, value)?),
Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?),
Precondition::DoesNotExist => {
let current_version = self.get_version(key)?;
if current_version.is_none() {
Ok(self.write_versioned_kv_pair(key, value)?)
Ok(self.write_versioned_kv_pair(key, value).await?)
} else {
Err(Error::kv_pair_exists())
}
}
Precondition::MatchesVersion(version) => {
let current_version = self.get_version(key)?;
if current_version == Some(version) {
Ok(self.write_versioned_kv_pair(key, value)?)
Ok(self.write_versioned_kv_pair(key, value).await?)
} else {
Err(Error::version_mismatch(version, current_version))
}
}
}
}

fn write_versioned_kv_pair(&mut self, key: &ByteString, value: &VersionedValue) -> Result<()> {
async fn write_versioned_kv_pair(
&mut self,
key: &ByteString,
value: &VersionedValue,
) -> Result<()> {
self.buffer.clear();
Self::encode(value, &mut self.buffer)?;

let write_options = self.write_options();
let cf_handle = self.kv_cf_handle();
self.db
.put_cf_opt(&cf_handle, key, self.buffer.as_ref(), &write_options)?;

Ok(())
let mut wb = WriteBatch::default();
wb.put_cf(&cf_handle, key, self.buffer.as_ref());
Ok(self
.rocksdb
.write_batch(Priority::High, IoMode::default(), write_options, wb)
.await?)
}

fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> {
Expand Down
13 changes: 5 additions & 8 deletions crates/node/src/network_server/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const ROCKSDB_DB_PROPERTIES: &[(&str, MetricUnit)] = &[
("rocksdb.block-cache-capacity", MetricUnit::Bytes),
("rocksdb.block-cache-usage", MetricUnit::Bytes),
("rocksdb.block-cache-pinned-usage", MetricUnit::Bytes),
("rocksdb.num-running-flushes", MetricUnit::Count),
];

// Per column-family properties
Expand All @@ -161,7 +162,6 @@ const ROCKSDB_CF_PROPERTIES: &[(&str, MetricUnit)] = &[
"rocksdb.estimate-pending-compaction-bytes",
MetricUnit::Bytes,
),
("rocksdb.num-running-flushes", MetricUnit::Count),
("rocksdb.num-running-compactions", MetricUnit::Count),
("rocksdb.actual-delayed-write-rate", MetricUnit::Count),
("rocksdb.num-files-at-level0", MetricUnit::Count),
Expand Down Expand Up @@ -203,13 +203,10 @@ pub async fn render_metrics(State(state): State<NodeCtrlHandlerState>) -> String
);

for db in &all_dbs {
let labels = vec![
format!("db=\"{}\"", formatting::sanitize_label_value(&db.name)),
format!(
"owner=\"{}\"",
formatting::sanitize_label_value(db.owner.into())
),
];
let labels = vec![format!(
"db=\"{}\"",
formatting::sanitize_label_value(&db.name)
)];
// Tickers (Counters)
for ticker in ROCKSDB_TICKERS {
format_rocksdb_stat_ticker_for_prometheus(&mut out, db, &labels, *ticker);
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/network_server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use restate_types::config::CommonOptions;
/// Be mindful when adding new labels, the number of time series(es) is directly propotional
/// to cardinality of the chosen labels. Avoid using labels with potential high cardinality
/// as much as possible (e.g. `restate.invocation.id`)
static ALLOWED_LABELS: &[&str] = &["rpc.method", "rpc.service", "command", "service"];
static ALLOWED_LABELS: &[&str] = &["rpc.method", "rpc.service", "command", "service", "db"];

pub(crate) fn install_global_prometheus_recorder(opts: &CommonOptions) -> PrometheusHandle {
let builder = PrometheusBuilder::default()
Expand Down
1 change: 0 additions & 1 deletion crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ impl<'a> Transaction for RocksDBTransaction<'a> {
// writes to RocksDB. However, it is safe to write the WriteBatch for a given partition,
// because there can only be a single writer (the leading PartitionProcessor).
let write_batch = self.txn.get_writebatch();
// todo: make async and use configuration to control use of WAL
if write_batch.is_empty() {
return Ok(());
}
Expand Down
19 changes: 6 additions & 13 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::debug;

use restate_core::ShutdownError;
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError,
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::RocksDbOptions;
Expand Down Expand Up @@ -59,25 +59,18 @@ impl PartitionStoreManager {
) -> std::result::Result<Self, RocksError> {
let options = storage_opts.load();

let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::PartitionProcessor,
options.data_dir(),
db_options(),
)
.add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
.build_as_optimistic_db();
let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), options.data_dir(), db_options())
.add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
.build_as_optimistic_db();

let manager = RocksDbManager::get();
// todo remove this when open_db is async
let raw_db = tokio::task::spawn_blocking(move || manager.open_db(updateable_opts, db_spec))
.await
.map_err(|_| ShutdownError)??;

let rocksdb = manager
.get_db(Owner::PartitionProcessor, DbName::new(DB_NAME))
.unwrap();
let rocksdb = manager.get_db(DbName::new(DB_NAME)).unwrap();

Ok(Self {
raw_db,
Expand Down
Loading

0 comments on commit cfe427d

Please sign in to comment.