From 2e569cff6d724a836208f3a208bc3c1c95f8d182 Mon Sep 17 00:00:00 2001 From: Jay Huh Date: Mon, 26 Feb 2024 13:29:31 -0800 Subject: [PATCH] New API for remote compaction --- db/compaction/compaction_service_job.cc | 28 ++++++------ db/compaction/compaction_service_test.cc | 55 +++++++++++++----------- include/rocksdb/options.h | 29 +++++++++++++ 3 files changed, 73 insertions(+), 39 deletions(-) diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 442eaf8ea725..2411c27aac36 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, GetCompactionId(sub_compact), thread_pri_); - CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->StartV2(info, compaction_input_binary); - switch (compaction_status) { + CompactionServiceScheduleResponse response = + db_options_.compaction_service->Schedule(info, compaction_input_binary); + switch (response.status) { case CompactionServiceJobStatus::kSuccess: break; case CompactionServiceJobStatus::kFailure: sub_compact->status = Status::Incomplete( - "CompactionService failed to start compaction job."); + "CompactionService failed to schedule a remote compaction job."); ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Remote compaction failed to start.", compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; + return response.status; case CompactionServiceJobStatus::kUseLocal: ROCKS_LOG_INFO( db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API Start.", + "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", compaction_input.column_family.name.c_str(), job_id_); - return compaction_status; + return response.status; default: assert(false); // unknown status break; @@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Waiting for remote compaction...", compaction_input.column_family.name.c_str(), job_id_); std::string compaction_result_binary; - compaction_status = db_options_.compaction_service->WaitForCompleteV2( - info, &compaction_result_binary); + CompactionServiceJobStatus compaction_status = + db_options_.compaction_service->Wait(response.scheduled_job_id, + &compaction_result_binary); if (compaction_status == CompactionServiceJobStatus::kUseLocal) { - ROCKS_LOG_INFO(db_options_.info_log, - "[%s] [JOB %d] Remote compaction fallback to local by API " - "WaitForComplete.", - compaction_input.column_family.name.c_str(), job_id_); + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", + compaction_input.column_family.name.c_str(), job_id_); return compaction_status; } @@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, } #endif // NDEBUG } // namespace ROCKSDB_NAMESPACE - diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 7c87f88d1be0..3fd6ad83bcad 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService { const char* Name() const override { return kClassName(); } - CompactionServiceJobStatus StartV2( + CompactionServiceScheduleResponse Schedule( const CompactionServiceJobInfo& info, const std::string& compaction_service_input) override { InstrumentedMutexLock l(&mutex_); start_info_ = info; assert(info.db_name == db_path_); - jobs_.emplace(info.job_id, compaction_service_input); - CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; - if (is_override_start_status_) { - return override_start_status_; - } - return s; - } - - CompactionServiceJobStatus WaitForCompleteV2( - const CompactionServiceJobInfo& info, - std::string* compaction_service_result) override { + std::string unique_id = Env::Default()->GenerateUniqueId(); + jobs_.emplace(unique_id, compaction_service_input); + infos_.emplace(unique_id, info); + CompactionServiceScheduleResponse response( + unique_id, is_override_start_status_ + ? override_start_status_ + : CompactionServiceJobStatus::kSuccess); + return response; + } + + CompactionServiceJobStatus Wait(const std::string& scheduled_job_id, + std::string* result) override { std::string compaction_input; - assert(info.db_name == db_path_); { InstrumentedMutexLock l(&mutex_); - wait_info_ = info; - auto i = jobs_.find(info.job_id); - if (i == jobs_.end()) { + auto job_index = jobs_.find(scheduled_job_id); + if (job_index == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } - compaction_input = std::move(i->second); - jobs_.erase(i); - } + compaction_input = std::move(job_index->second); + jobs_.erase(job_index); + auto info_index = infos_.find(scheduled_job_id); + if (info_index == infos_.end()) { + return CompactionServiceJobStatus::kFailure; + } + wait_info_ = std::move(info_index->second); + infos_.erase(info_index); + } if (is_override_wait_status_) { return override_wait_status_; } - CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = @@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService { OpenAndCompactOptions options; options.canceled = &canceled_; - Status s = DB::OpenAndCompact( - options, db_path_, db_path_ + "/" + std::to_string(info.job_id), - compaction_input, compaction_service_result, options_override); + Status s = + DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id, + compaction_input, result, options_override); if (is_override_wait_result_) { - *compaction_service_result = override_wait_result_; + *result = override_wait_result_; } compaction_num_.fetch_add(1); if (s.ok()) { @@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService { private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; - std::map jobs_; + std::map jobs_; + std::map infos_; const std::string db_path_; Options options_; std::shared_ptr statistics_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8932075de0af..33803eb1f67b 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -428,6 +428,17 @@ struct CompactionServiceJobInfo { priority(priority_) {} }; +struct CompactionServiceScheduleResponse { + std::string scheduled_job_id; // Generated outside of primary host, unique + // across different DBs and sessions + CompactionServiceJobStatus status; + CompactionServiceScheduleResponse(std::string scheduled_job_id_, + CompactionServiceJobStatus status_) + : scheduled_job_id(scheduled_job_id_), status(status_) {} + CompactionServiceScheduleResponse(CompactionServiceJobStatus status_) + : status(status_) {} +}; + // Exceptions MUST NOT propagate out of overridden functions into RocksDB, // because RocksDB is not exception-safe. This could cause undefined behavior // including data loss, unreported corruption, deadlocks, and more. @@ -438,6 +449,24 @@ class CompactionService : public Customizable { // Returns the name of this compaction service. const char* Name() const override = 0; + // Schedule compaction to be processed remotely. + virtual CompactionServiceScheduleResponse Schedule( + const CompactionServiceJobInfo& /*info*/, + const std::string& /*compaction_service_input*/) { + CompactionServiceScheduleResponse response( + CompactionServiceJobStatus::kUseLocal); + return response; + } + + // Wait for the scheduled compaction to finish from the remote worker + virtual CompactionServiceJobStatus Wait( + const std::string& /*scheduled_job_id*/, std::string* /*result*/) { + return CompactionServiceJobStatus::kUseLocal; + } + + // Deprecated. Please implement Schedule() and Wait() API to handle remote + // compaction + // Start the remote compaction with `compaction_service_input`, which can be // passed to `DB::OpenAndCompact()` on the remote side. `info` provides the // information the user might want to know, which includes `job_id`.