Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Compaction] Update APIs to support generic unique identifier format #12384

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,24 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
CompactionServiceScheduleResponse response =
db_options_.compaction_service->Schedule(info, compaction_input_binary);
switch (response.status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
return response.status;
default:
assert(false); // unknown status
break;
Expand All @@ -101,14 +101,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
&compaction_result_binary);

if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}

Expand Down Expand Up @@ -830,4 +831,3 @@ bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE

55 changes: 30 additions & 25 deletions db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,44 @@ class MyTestCompactionService : public CompactionService {

const char* Name() const override { return kClassName(); }

CompactionServiceJobStatus StartV2(
CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& info,
const std::string& compaction_service_input) override {
InstrumentedMutexLock l(&mutex_);
start_info_ = info;
assert(info.db_name == db_path_);
jobs_.emplace(info.job_id, compaction_service_input);
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
if (is_override_start_status_) {
return override_start_status_;
}
return s;
}

CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override {
std::string unique_id = Env::Default()->GenerateUniqueId();
jobs_.emplace(unique_id, compaction_service_input);
infos_.emplace(unique_id, info);
CompactionServiceScheduleResponse response(
unique_id, is_override_start_status_
? override_start_status_
: CompactionServiceJobStatus::kSuccess);
return response;
}

CompactionServiceJobStatus Wait(const std::string& scheduled_job_id,
std::string* result) override {
std::string compaction_input;
assert(info.db_name == db_path_);
{
InstrumentedMutexLock l(&mutex_);
wait_info_ = info;
auto i = jobs_.find(info.job_id);
if (i == jobs_.end()) {
auto job_index = jobs_.find(scheduled_job_id);
if (job_index == jobs_.end()) {
return CompactionServiceJobStatus::kFailure;
}
compaction_input = std::move(i->second);
jobs_.erase(i);
}
compaction_input = std::move(job_index->second);
jobs_.erase(job_index);

auto info_index = infos_.find(scheduled_job_id);
if (info_index == infos_.end()) {
return CompactionServiceJobStatus::kFailure;
}
wait_info_ = std::move(info_index->second);
infos_.erase(info_index);
}
if (is_override_wait_status_) {
return override_wait_status_;
}

CompactionServiceOptionsOverride options_override;
options_override.env = options_.env;
options_override.file_checksum_gen_factory =
Expand All @@ -90,11 +94,11 @@ class MyTestCompactionService : public CompactionService {
OpenAndCompactOptions options;
options.canceled = &canceled_;

Status s = DB::OpenAndCompact(
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
compaction_input, compaction_service_result, options_override);
Status s =
DB::OpenAndCompact(options, db_path_, db_path_ + "/" + scheduled_job_id,
compaction_input, result, options_override);
if (is_override_wait_result_) {
*compaction_service_result = override_wait_result_;
*result = override_wait_result_;
}
compaction_num_.fetch_add(1);
if (s.ok()) {
Expand Down Expand Up @@ -135,7 +139,8 @@ class MyTestCompactionService : public CompactionService {
private:
InstrumentedMutex mutex_;
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
std::map<std::string, std::string> jobs_;
std::map<std::string, CompactionServiceJobInfo> infos_;
const std::string db_path_;
Options options_;
std::shared_ptr<Statistics> statistics_;
Expand Down
29 changes: 29 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,17 @@ struct CompactionServiceJobInfo {
priority(priority_) {}
};

struct CompactionServiceScheduleResponse {
std::string scheduled_job_id; // Generated outside of primary host, unique
// across different DBs and sessions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...across different DBs, sessions and jobs"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Job Id itself within CompactionServiceJobInfo is not unique globally.

uint64_t job_id; // job_id is only unique within the current DB and session,
// restart DB will reset the job_id. `db_id` and
// `db_session_id` could help you build unique id across
// different DBs and sessions.

If users are leveraging centralized queue service externally to offload compactions to remote workers, globally unique identifier is necessary (basically unique across different DBs, sessions and jobs)

CompactionServiceJobStatus status;
CompactionServiceScheduleResponse(std::string scheduled_job_id_,
CompactionServiceJobStatus status_)
: scheduled_job_id(scheduled_job_id_), status(status_) {}
explicit CompactionServiceScheduleResponse(CompactionServiceJobStatus status_)
: status(status_) {}
};

// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
Expand All @@ -438,6 +449,24 @@ class CompactionService : public Customizable {
// Returns the name of this compaction service.
const char* Name() const override = 0;

// Schedule compaction to be processed remotely.
virtual CompactionServiceScheduleResponse Schedule(
const CompactionServiceJobInfo& /*info*/,
const std::string& /*compaction_service_input*/) {
CompactionServiceScheduleResponse response(
CompactionServiceJobStatus::kUseLocal);
return response;
}

// Wait for the scheduled compaction to finish from the remote worker
virtual CompactionServiceJobStatus Wait(
const std::string& /*scheduled_job_id*/, std::string* /*result*/) {
return CompactionServiceJobStatus::kUseLocal;
}

// Deprecated. Please implement Schedule() and Wait() API to handle remote
// compaction

// Start the remote compaction with `compaction_service_input`, which can be
// passed to `DB::OpenAndCompact()` on the remote side. `info` provides the
// information the user might want to know, which includes `job_id`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deprecate experimental Remote Compaction APIs - StartV2() and WaitForCompleteV2() and introduce Schedule() and Wait(). The new APIs essentially does the same thing as the old APIs. They allow taking externally generated unique id to wait for remote compaction to complete.
Loading