Skip to content

Commit

Permalink
New API for remote compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Feb 27, 2024
1 parent a43481b commit 2e569cf
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 39 deletions.
28 changes: 14 additions & 14 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
default:
assert(false); // unknown status
break;
Expand All @@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
&compaction_result_binary);

if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}

Expand Down Expand Up @@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE

55 changes: 30 additions & 25 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService {

const char* Name() const override { return kClassName(); }

CompactionServiceJobStatus StartV2(
CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) override {
InstrumentedMutexLock l(&mutex_);
start_info_ = info;
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status_) {
return override_start_status_;
}
return s;
}

CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override {
std::string unique_id = Env::Default()->GenerateUniqueId();
jobs_.emplace(unique_id, compaction_service_input);
infos_.emplace(unique_id, info);
CompactionServiceScheduleResponse response(
unique_id, is_override_start_status_
? override_start_status_
: CompactionServiceJobStatus::kSuccess);
return response;
}

CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
std::string* result) override {
std::string compaction_input;
assert(info.db_name == db_path_);
{
InstrumentedMutexLock l(&mutex_);
wait_info_ = info;
auto i = jobs_.find(info.job_id);
if (i == jobs_.end()) {
auto job_index = jobs_.find(scheduled_job_id);
if (job_index == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
compaction_input = std::move(job_index->second);
jobs_.erase(job_index);

auto info_index = infos_.find(scheduled_job_id);
if (info_index == infos_.end()) {
return CompactionServiceJobStatus::kFailure;
}
wait_info_ = std::move(info_index->second);
infos_.erase(info_index);
}
if (is_override_wait_status_) {
return override_wait_status_;
}

CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
Expand All @@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService {
OpenAndCompactOptions options;
options.canceled = &canceled_;

Status s = DB::OpenAndCompact(
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
compaction_input, compaction_service_result, options_override);
Status s =
DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id,
compaction_input, result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
*result = override_wait_result_;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
Expand Down Expand Up @@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService {
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
std::map<std::string, std::string> jobs_;
std::map<std::string, CompactionServiceJobInfo> infos_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;
Expand Down
29 changes: 29 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,17 @@ struct CompactionServiceJobInfo {
priority(priority_) {}
};

struct CompactionServiceScheduleResponse {
std::string scheduled_job_id; // Generated outside of primary host, unique
// across different DBs and sessions
CompactionServiceJobStatus status;
CompactionServiceScheduleResponse(std::string scheduled_job_id_,
CompactionServiceJobStatus status_)
: scheduled_job_id(scheduled_job_id_), status(status_) {}
CompactionServiceScheduleResponse(CompactionServiceJobStatus status_)
: status(status_) {}
};

// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
Expand All @@ -438,6 +449,24 @@ class CompactionService : public Customizable {
// Returns the name of this compaction service.
const char* Name() const override = 0;

// Schedule compaction to be processed remotely.
virtual CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& /*info*/,
const std::string& /*compaction_service_input*/) {
CompactionServiceScheduleResponse response(
CompactionServiceJobStatus::kUseLocal);
return response;
}

// Wait for the scheduled compaction to finish from the remote worker
virtual CompactionServiceJobStatus Wait(
const std::string& /*scheduled_job_id*/, std::string* /*result*/) {
return CompactionServiceJobStatus::kUseLocal;
}

// Deprecated. Please implement Schedule() and Wait() API to handle remote
// compaction

// Start the remote compaction with `compaction_service_input`, which can be
// passed to `DB::OpenAndCompact()` on the remote side. `info` provides the
// information the user might want to know, which includes `job_id`.
Expand Down

0 comments on commit 2e569cf

Please sign in to comment.