Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tuning and combined improvements for perf and observability #1493

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ codegen-units = 1
# Let's be defensive and abort on every panic
panic = "abort"

[profile.release-debug]
inherits = "release"
debug = true

[profile.dev]
# Let's be defensive and abort on every panic
panic = "abort"
Expand Down
8 changes: 3 additions & 5 deletions crates/bifrost/benches/append_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::StreamExt;
use restate_bifrost::{Bifrost, BifrostService};
use restate_core::metadata;
use restate_rocksdb::{DbName, Owner, RocksDbManager};
use restate_rocksdb::{DbName, RocksDbManager};
use restate_types::config::{
BifrostOptionsBuilder, CommonOptionsBuilder, ConfigurationBuilder, LocalLogletOptionsBuilder,
};
Expand Down Expand Up @@ -174,14 +174,12 @@ fn write_throughput_local_loglet(c: &mut Criterion) {
group.finish();
let db_manager = RocksDbManager::get();

let db = db_manager
.get_db(Owner::Bifrost, DbName::new("local-loglet"))
.unwrap();
let db = db_manager.get_db(DbName::new("local-loglet")).unwrap();
let stats = db.get_statistics_str();
let total_wb_usage = db_manager.get_total_write_buffer_usage();
let wb_capacity = db_manager.get_total_write_buffer_capacity();
let memory = db_manager
.get_memory_usage_stats(&[(Owner::Bifrost, DbName::new("local-loglet"))])
.get_memory_usage_stats(&[DbName::new("local-loglet")])
.unwrap();
test_runner_rt.block_on(tc.shutdown_node("completed", 0));
test_runner_rt.block_on(RocksDbManager::get().shutdown());
Expand Down
32 changes: 12 additions & 20 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::sync::Arc;

use restate_rocksdb::{
CfExactPattern, CfName, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError,
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
Expand Down Expand Up @@ -58,23 +58,18 @@ impl RocksDbLogStore {

let data_dir = options.data_dir();

let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::Bifrost,
data_dir,
db_options(options),
)
.add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options)
.add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options)
// not very important but it's to reduce the number of merges by flushing.
// it's also a small cf so it should be quick.
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
.ensure_column_families(cfs)
.build_as_db();
let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), data_dir, db_options(options))
.add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options)
.add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options)
// not very important but it's to reduce the number of merges by flushing.
// it's also a small cf so it should be quick.
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
.ensure_column_families(cfs)
.build_as_db();
let db_name = db_spec.name().clone();
// todo: use the returned rocksdb object when open_db returns Arc<RocksDb>
let _ = db_manager.open_db(updateable_options, db_spec)?;
let rocksdb = db_manager.get_db(Owner::Bifrost, db_name).unwrap();
let rocksdb = db_manager.get_db(db_name).unwrap();
Ok(Self { rocksdb })
}

Expand Down Expand Up @@ -138,6 +133,7 @@ fn cf_data_options(mut opts: rocksdb::Options) -> rocksdb::Options {
//
// Set compactions per level
//
opts.set_max_write_buffer_number(2);
opts.set_num_levels(7);
opts.set_compression_per_level(&[
DBCompressionType::None,
Expand Down Expand Up @@ -165,11 +161,7 @@ fn cf_metadata_options(mut opts: rocksdb::Options) -> rocksdb::Options {
DBCompressionType::None,
DBCompressionType::Zstd,
]);
//
// Most of the changes are highly temporal, we try to delay flushing
// to merge metadata updates into fewer L0 files.
opts.set_max_write_buffer_number(3);
opts.set_min_write_buffer_number_to_merge(3);
opts.set_max_write_buffer_number(2);
opts.set_max_successive_merges(10);
// Merge operator for log state updates
opts.set_merge_operator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl LogStoreWriter {
if self.manual_wal_flush {
// WAL flush is done in the foreground, but sync will happen in the background to avoid
// blocking IO.
if let Err(e) = self.rocksdb.inner().flush_wal(opts.sync_wal_before_ack) {
if let Err(e) = self.rocksdb.flush_wal(opts.sync_wal_before_ack).await {
warn!("Failed to flush rocksdb WAL in local loglet : {}", e);
self.send_acks(Err(Error::LogStoreError(e.into())));
return;
Expand Down
42 changes: 28 additions & 14 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ use codederror::CodedError;
use restate_core::cancellation_watcher;
use restate_core::metadata_store::{Precondition, VersionedValue};
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDbManager, RocksError,
CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager,
RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::RocksDbOptions;
use restate_types::storage::{
StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError,
};
use restate_types::Version;
use rocksdb::{BoundColumnFamily, Options, WriteOptions, DB};
use rocksdb::{BoundColumnFamily, Options, WriteBatch, WriteOptions, DB};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -63,6 +64,8 @@ pub enum MetadataStoreRequest {
pub enum Error {
#[error("storage error: {0}")]
Storage(#[from] rocksdb::Error),
#[error("rocksdb error: {0}")]
RocksDb(#[from] RocksError),
#[error("failed precondition: {0}")]
FailedPrecondition(String),
#[error("invalid argument: {0}")]
Expand Down Expand Up @@ -99,6 +102,7 @@ pub enum BuildError {
/// store in a single thread.
pub struct LocalMetadataStore {
db: Arc<DB>,
rocksdb: Arc<RocksDb>,
opts: Box<dyn Updateable<RocksDbOptions> + Send + 'static>,
request_rx: RequestReceiver,
buffer: BytesMut,
Expand All @@ -119,11 +123,11 @@ impl LocalMetadataStore {
{
let (request_tx, request_rx) = mpsc::channel(request_queue_length);

let db_name = DbName::new(DB_NAME);
let db_manager = RocksDbManager::get();
let cfs = vec![CfName::new(KV_PAIRS)];
let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::MetadataStore,
db_name.clone(),
data_dir.as_ref().to_path_buf(),
Options::default(),
)
Expand All @@ -132,9 +136,13 @@ impl LocalMetadataStore {
.build_as_db();

let db = db_manager.open_db(rocksdb_options(), db_spec)?;
let rocksdb = db_manager
.get_db(db_name)
.expect("metadata store db is open");

Ok(Self {
db,
rocksdb,
opts: Box::new(rocksdb_options()),
buffer: BytesMut::default(),
request_rx,
Expand Down Expand Up @@ -204,7 +212,7 @@ impl LocalMetadataStore {
precondition,
result_tx,
} => {
let result = self.put(&key, &value, precondition);
let result = self.put(&key, &value, precondition).await;
Self::log_error(&result, "Put");
let _ = result_tx.send(result);
}
Expand Down Expand Up @@ -244,43 +252,49 @@ impl LocalMetadataStore {
}
}

fn put(
async fn put(
&mut self,
key: &ByteString,
value: &VersionedValue,
precondition: Precondition,
) -> Result<()> {
match precondition {
Precondition::None => Ok(self.write_versioned_kv_pair(key, value)?),
Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?),
Precondition::DoesNotExist => {
let current_version = self.get_version(key)?;
if current_version.is_none() {
Ok(self.write_versioned_kv_pair(key, value)?)
Ok(self.write_versioned_kv_pair(key, value).await?)
} else {
Err(Error::kv_pair_exists())
}
}
Precondition::MatchesVersion(version) => {
let current_version = self.get_version(key)?;
if current_version == Some(version) {
Ok(self.write_versioned_kv_pair(key, value)?)
Ok(self.write_versioned_kv_pair(key, value).await?)
} else {
Err(Error::version_mismatch(version, current_version))
}
}
}
}

fn write_versioned_kv_pair(&mut self, key: &ByteString, value: &VersionedValue) -> Result<()> {
async fn write_versioned_kv_pair(
&mut self,
key: &ByteString,
value: &VersionedValue,
) -> Result<()> {
self.buffer.clear();
Self::encode(value, &mut self.buffer)?;

let write_options = self.write_options();
let cf_handle = self.kv_cf_handle();
self.db
.put_cf_opt(&cf_handle, key, self.buffer.as_ref(), &write_options)?;

Ok(())
let mut wb = WriteBatch::default();
wb.put_cf(&cf_handle, key, self.buffer.as_ref());
Ok(self
.rocksdb
.write_batch(Priority::High, IoMode::default(), write_options, wb)
.await?)
}

fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> {
Expand Down
13 changes: 5 additions & 8 deletions crates/node/src/network_server/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const ROCKSDB_DB_PROPERTIES: &[(&str, MetricUnit)] = &[
("rocksdb.block-cache-capacity", MetricUnit::Bytes),
("rocksdb.block-cache-usage", MetricUnit::Bytes),
("rocksdb.block-cache-pinned-usage", MetricUnit::Bytes),
("rocksdb.num-running-flushes", MetricUnit::Count),
];

// Per column-family properties
Expand All @@ -161,7 +162,6 @@ const ROCKSDB_CF_PROPERTIES: &[(&str, MetricUnit)] = &[
"rocksdb.estimate-pending-compaction-bytes",
MetricUnit::Bytes,
),
("rocksdb.num-running-flushes", MetricUnit::Count),
("rocksdb.num-running-compactions", MetricUnit::Count),
("rocksdb.actual-delayed-write-rate", MetricUnit::Count),
("rocksdb.num-files-at-level0", MetricUnit::Count),
Expand Down Expand Up @@ -203,13 +203,10 @@ pub async fn render_metrics(State(state): State<NodeCtrlHandlerState>) -> String
);

for db in &all_dbs {
let labels = vec![
format!("db=\"{}\"", formatting::sanitize_label_value(&db.name)),
format!(
"owner=\"{}\"",
formatting::sanitize_label_value(db.owner.into())
),
];
let labels = vec![format!(
"db=\"{}\"",
formatting::sanitize_label_value(&db.name)
)];
// Tickers (Counters)
for ticker in ROCKSDB_TICKERS {
format_rocksdb_stat_ticker_for_prometheus(&mut out, db, &labels, *ticker);
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/network_server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use restate_types::config::CommonOptions;
/// Be mindful when adding new labels, the number of time series(es) is directly propotional
/// to cardinality of the chosen labels. Avoid using labels with potential high cardinality
/// as much as possible (e.g. `restate.invocation.id`)
static ALLOWED_LABELS: &[&str] = &["rpc.method", "rpc.service", "command", "service"];
static ALLOWED_LABELS: &[&str] = &["rpc.method", "rpc.service", "command", "service", "db"];

pub(crate) fn install_global_prometheus_recorder(opts: &CommonOptions) -> PrometheusHandle {
let builder = PrometheusBuilder::default()
Expand Down
1 change: 0 additions & 1 deletion crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ impl<'a> Transaction for RocksDBTransaction<'a> {
// writes to RocksDB. However, it is safe to write the WriteBatch for a given partition,
// because there can only be a single writer (the leading PartitionProcessor).
let write_batch = self.txn.get_writebatch();
// todo: make async and use configuration to control use of WAL
if write_batch.is_empty() {
return Ok(());
}
Expand Down
19 changes: 6 additions & 13 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::debug;

use restate_core::ShutdownError;
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError,
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::RocksDbOptions;
Expand Down Expand Up @@ -59,25 +59,18 @@ impl PartitionStoreManager {
) -> std::result::Result<Self, RocksError> {
let options = storage_opts.load();

let db_spec = DbSpecBuilder::new(
DbName::new(DB_NAME),
Owner::PartitionProcessor,
options.data_dir(),
db_options(),
)
.add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
.build_as_optimistic_db();
let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), options.data_dir(), db_options())
.add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
.build_as_optimistic_db();

let manager = RocksDbManager::get();
// todo remove this when open_db is async
let raw_db = tokio::task::spawn_blocking(move || manager.open_db(updateable_opts, db_spec))
.await
.map_err(|_| ShutdownError)??;

let rocksdb = manager
.get_db(Owner::PartitionProcessor, DbName::new(DB_NAME))
.unwrap();
let rocksdb = manager.get_db(DbName::new(DB_NAME)).unwrap();

Ok(Self {
raw_db,
Expand Down
Loading
Loading