Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Fix forceflush may wait for ever #2584

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4f4a924
Fix forceflush may wait for ever
owent Mar 10, 2024
cc9db7b
Fix `force_flush_pending_sequence` in log batch processor
owent Mar 10, 2024
8561e1f
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Mar 12, 2024
5ca613f
Fix ForceFlush may wait a longer time than the timeout argument.
owent Mar 12, 2024
bbb1f9a
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Mar 14, 2024
e7031c2
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Mar 21, 2024
e194608
Use `notify_all` to notify worker threads to avoid notification miss …
owent Mar 22, 2024
6f12107
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Mar 22, 2024
dbb6bb7
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Mar 25, 2024
7444f31
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Mar 29, 2024
7e0596c
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Apr 6, 2024
ab39d6c
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Apr 12, 2024
70d4a88
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Apr 26, 2024
d5ca529
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
marcalff Apr 29, 2024
0b95c70
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
owent Apr 30, 2024
eea67c8
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
ThomsonTan May 6, 2024
cd7c21b
Using `uint64_t` instead of `std::uint64_t`
owent May 8, 2024
1c21601
Merge remote-tracking branch 'github/main' into fix_hangs_too_long_wh…
owent May 10, 2024
f28eab8
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
ThomsonTan May 16, 2024
d3df9a4
Merge branch 'main' into fix_hangs_too_long_when_calling_forceflush
ThomsonTan May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,25 @@ class BatchLogRecordProcessor : public LogRecordProcessor

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
std::atomic<uint64_t> force_flush_pending_sequence{0};
std::atomic<uint64_t> force_flush_notified_sequence{0};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};

// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
// and may not initialize the member correctly. See also
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
inline SynchronizationData() {}
};

/**
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
* any time
*
* @param notify_force_flush Flag to indicate whether to notify force flush completion.
* @param notify_force_flush Sequence to indicate whether to notify force flush completion.
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
static void NotifyCompletion(uint64_t notify_force_flush,
const std::unique_ptr<LogRecordExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -50,9 +51,9 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::atomic<bool> is_force_flush_pending_{false};
std::atomic<bool> is_force_wakeup_background_worker_{false};
std::atomic<bool> is_force_flush_notified_{false};
std::atomic<uint64_t> force_flush_pending_sequence_{0};
std::atomic<uint64_t> force_flush_notified_sequence_{0};
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};
Expand Down
16 changes: 11 additions & 5 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
Expand Down Expand Up @@ -115,20 +116,25 @@ class BatchSpanProcessor : public SpanProcessor

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
std::atomic<uint64_t> force_flush_pending_sequence{0};
std::atomic<uint64_t> force_flush_notified_sequence{0};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};

// Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs
// and may not initialize the member correctly. See also
// https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be
inline SynchronizationData() {}
};

/**
* @brief Notify completion of shutdown and force flush. This may be called from the any thread at
* any time
*
* @param notify_force_flush Flag to indicate whether to notify force flush completion.
* @param notify_force_flush Sequence to indicate whether to notify force flush completion.
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
static void NotifyCompletion(uint64_t notify_force_flush,
const std::unique_ptr<SpanExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data);

Expand Down
74 changes: 37 additions & 37 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
{
// signal the worker thread
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
marcalff marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
// Now wait for the worker thread to signal back from the Export method
std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);

synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
std::uint64_t current_sequence =
synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the fetch_add here just the same as synchronization_data_->force_flush_pending_sequence++?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, synchronization_data_->force_flush_pending_sequence++ use std::memory_order_seq_cst , we can use std::memory_order_release here.

1;
synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release);
auto break_condition = [this]() {
auto break_condition = [this, current_sequence]() {
if (synchronization_data_->is_shutdown.load() == true)
{
return true;
}

// Wake up the worker thread once.
if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) >
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
{
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
}

return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
current_sequence;
};

// Fix timeout to meet requirement of wait_for
Expand All @@ -110,35 +114,22 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
// between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
// for ever
// When force_flush_notified_sequence.compare_exchange_strong(...) and
// force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and
// force_flush_cv.wait(). We must not wait for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_,
break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
std::chrono::microseconds wait_timeout = scheduled_delay_millis_;

// If it's already signaled, we must wait util notified.
// We use a spin lock here
if (false ==
synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
++retry_waiting_times)
if (wait_timeout > timeout_steady)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
}
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release);

return result;
return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >=
current_sequence;
}

void BatchLogRecordProcessor::DoBackgroundWork()
Expand Down Expand Up @@ -182,8 +173,8 @@ void BatchLogRecordProcessor::Export()
{
std::vector<std::unique_ptr<Recordable>> records_arr;
size_t num_records_to_export;
bool notify_force_flush =
synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel);
std::uint64_t notify_force_flush =
synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire);
if (notify_force_flush)
{
num_records_to_export = buffer_.size();
Expand Down Expand Up @@ -217,7 +208,7 @@ void BatchLogRecordProcessor::Export()
}

void BatchLogRecordProcessor::NotifyCompletion(
bool notify_force_flush,
std::uint64_t notify_force_flush,
const std::unique_ptr<LogRecordExporter> &exporter,
const std::shared_ptr<SynchronizationData> &synchronization_data)
{
Expand All @@ -226,7 +217,8 @@ void BatchLogRecordProcessor::NotifyCompletion(
return;
}

if (notify_force_flush)
if (notify_force_flush >
synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire))
{
if (exporter)
{
Expand All @@ -236,8 +228,15 @@ void BatchLogRecordProcessor::NotifyCompletion(
std::chrono::microseconds::zero());
exporter->ForceFlush(timeout);
}
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
synchronization_data->force_flush_cv.notify_one();

std::uint64_t notified_sequence =
synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire);
while (notify_force_flush > notified_sequence)
{
synchronization_data->force_flush_notified_sequence.compare_exchange_strong(
notified_sequence, notify_force_flush, std::memory_order_acq_rel);
synchronization_data->force_flush_cv.notify_all();
}
}
}

Expand All @@ -246,7 +245,8 @@ void BatchLogRecordProcessor::DrainQueue()
while (true)
{
if (buffer_.empty() &&
false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <=
synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire))
{
break;
}
Expand Down Expand Up @@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
if (worker_thread_.joinable())
{
synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release);
synchronization_data_->cv.notify_one();
synchronization_data_->cv.notify_all();
worker_thread_.join();
}

Expand Down
66 changes: 29 additions & 37 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
});

std::future_status status;
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
Expand All @@ -99,12 +100,13 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
break;
}
} while (status != std::future_status::ready);
bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
if (notify_force_flush)

std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
while (notify_force_flush > notified_sequence)
{
std::unique_lock<std::mutex> lk(force_flush_m_);
is_force_flush_notified_.store(true, std::memory_order_release);
force_flush_cv_.notify_one();
force_flush_notified_sequence_.compare_exchange_strong(notified_sequence, notify_force_flush,
std::memory_order_acq_rel);
force_flush_cv_.notify_all();
}

return true;
Expand All @@ -113,24 +115,27 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
std::unique_lock<std::mutex> lk_cv(force_flush_m_);
is_force_flush_pending_.store(true, std::memory_order_release);
auto break_condition = [this]() {
std::uint64_t current_sequence =
force_flush_pending_sequence_.fetch_add(1, std::memory_order_release) + 1;
auto break_condition = [this, current_sequence]() {
if (IsShutdown())
{
return true;
}

// Wake up the worker thread once.
if (is_force_flush_pending_.load(std::memory_order_acquire))
// Wake up the worker thread.
if (force_flush_pending_sequence_.load(std::memory_order_acquire) >
force_flush_notified_sequence_.load(std::memory_order_acquire))
{
is_force_wakeup_background_worker_.store(true, std::memory_order_release);
cv_.notify_one();
cv_.notify_all();
}
return is_force_flush_notified_.load(std::memory_order_acquire);
return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
};

auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
std::chrono::microseconds wait_timeout =
opentelemetry::common::DurationUtil::AdjustWaitForTimeout(timeout,
std::chrono::microseconds::zero());
std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(wait_timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
Expand All @@ -141,29 +146,19 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
bool result = false;
while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
// When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
// between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
// for ever
// When force_flush_notified_sequence_.compare_exchange_strong(...) and
// force_flush_cv_.notify_all() is called between force_flush_pending_sequence_.load(...) and
// force_flush_cv_.wait(). We must not wait for ever
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

// If it will be already signaled, we must wait until notified.
// We use a spin lock here
if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
{
for (int retry_waiting_times = 0;
false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
wait_timeout = export_interval_millis_;
if (wait_timeout > timeout_steady)
{
opentelemetry::common::SpinLockMutex::fast_yield();
if ((retry_waiting_times & 127) == 127)
{
std::this_thread::yield();
}
wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
}
result = force_flush_cv_.wait_for(lk_cv, wait_timeout, break_condition);
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
is_force_flush_notified_.store(false, std::memory_order_release);

if (result)
{
Expand All @@ -186,18 +181,15 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo
result = false;
}
}
return result;
return result &&
force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence;
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
if (worker_thread_.joinable())
{
{
// ensure that `cv_` is awaiting, and the update doesn't get lost
std::unique_lock<std::mutex> lk(cv_m_);
cv_.notify_all();
}
cv_.notify_all();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
Expand Down
Loading