From 4f4a924639aaf282d12cd37886ea84f9f5002e58 Mon Sep 17 00:00:00 2001 From: owent Date: Sun, 10 Mar 2024 17:48:09 +0800 Subject: [PATCH 1/5] Fix forceflush may wait for ever --- .../sdk/logs/batch_log_record_processor.h | 15 +++-- .../export/periodic_exporting_metric_reader.h | 5 +- .../sdk/trace/batch_span_processor.h | 16 +++-- sdk/src/logs/batch_log_record_processor.cc | 63 +++++++++---------- .../periodic_exporting_metric_reader.cc | 46 ++++++-------- sdk/src/trace/batch_span_processor.cc | 61 +++++++++--------- 6 files changed, 99 insertions(+), 107 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h index d6a44df142..e6991a8bc2 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h @@ -115,20 +115,25 @@ class BatchLogRecordProcessor : public LogRecordProcessor /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker{false}; - std::atomic is_force_flush_pending{false}; - std::atomic is_force_flush_notified{false}; - std::atomic force_flush_timeout_us{0}; std::atomic is_shutdown{false}; + std::atomic force_flush_pending_sequence{false}; + std::atomic force_flush_notified_sequence{false}; + std::atomic force_flush_timeout_us{0}; + + // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + inline SynchronizationData() {} }; /** * @brief Notify completion of shutdown and force flush. This may be called from the any thread at * any time * - * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param notify_force_flush Sequence to indicate whether to notify force flush completion. * @param synchronization_data Synchronization data to be notified. */ - static void NotifyCompletion(bool notify_force_flush, + static void NotifyCompletion(std::uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index b9221193fb..a91d23b994 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -50,9 +51,9 @@ class PeriodicExportingMetricReader : public MetricReader std::thread worker_thread_; /* Synchronization primitives */ - std::atomic is_force_flush_pending_{false}; std::atomic is_force_wakeup_background_worker_{false}; - std::atomic is_force_flush_notified_{false}; + std::atomic force_flush_pending_sequence_{0}; + std::atomic force_flush_notified_sequence_{0}; std::condition_variable cv_, force_flush_cv_; std::mutex cv_m_, force_flush_m_; }; diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index afbf4486b0..40e706f7f5 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -115,20 +116,25 @@ class BatchSpanProcessor : public SpanProcessor /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker{false}; - std::atomic is_force_flush_pending{false}; - std::atomic is_force_flush_notified{false}; - std::atomic force_flush_timeout_us{0}; std::atomic is_shutdown{false}; + std::atomic force_flush_pending_sequence{0}; + std::atomic force_flush_notified_sequence{0}; + std::atomic force_flush_timeout_us{0}; + + // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs + // and may not initialize the member correctly. See also + // https://stackoverflow.com/questions/53408962/try-to-understand-compiler-error-message-default-member-initializer-required-be + inline SynchronizationData() {} }; /** * @brief Notify completion of shutdown and force flush. This may be called from the any thread at * any time * - * @param notify_force_flush Flag to indicate whether to notify force flush completion. + * @param notify_force_flush Sequence to indicate whether to notify force flush completion. * @param synchronization_data Synchronization data to be notified. */ - static void NotifyCompletion(bool notify_force_flush, + static void NotifyCompletion(std::uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index 44e623e4f7..da80d5b90e 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex // Now wait for the worker thread to signal back from the Export method std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); - synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + std::uint64_t current_sequence = + synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) + + 1; synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); - auto break_condition = [this]() { + auto break_condition = [this, current_sequence]() { if (synchronization_data_->is_shutdown.load() == true) { return true; } // Wake up the worker thread once. - if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { synchronization_data_->cv.notify_one(); } - return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; }; // Fix timeout to meet requirement of wait_for @@ -110,35 +114,17 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever + // When force_flush_notified_sequence.compare_exchange_strong(...) and + // force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and + // force_flush_cv.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, break_condition); timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - // If it's already signaled, we must wait util notified. - // We use a spin lock here - if (false == - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel)) - { - for (int retry_waiting_times = 0; - false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); - ++retry_waiting_times) - { - opentelemetry::common::SpinLockMutex::fast_yield(); - if ((retry_waiting_times & 127) == 127) - { - std::this_thread::yield(); - } - } - } - - synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release); - - return result; + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; } void BatchLogRecordProcessor::DoBackgroundWork() @@ -182,8 +168,8 @@ void BatchLogRecordProcessor::Export() { std::vector> records_arr; size_t num_records_to_export; - bool notify_force_flush = - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); + std::uint64_t notify_force_flush = synchronization_data_->force_flush_pending_sequence.exchange( + false, std::memory_order_acq_rel); if (notify_force_flush) { num_records_to_export = buffer_.size(); @@ -217,7 +203,7 @@ void BatchLogRecordProcessor::Export() } void BatchLogRecordProcessor::NotifyCompletion( - bool notify_force_flush, + std::uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { @@ -226,7 +212,8 @@ void BatchLogRecordProcessor::NotifyCompletion( return; } - if (notify_force_flush) + if (notify_force_flush > + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire)) { if (exporter) { @@ -236,8 +223,15 @@ void BatchLogRecordProcessor::NotifyCompletion( std::chrono::microseconds::zero()); exporter->ForceFlush(timeout); } - synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); - synchronization_data->force_flush_cv.notify_one(); + + std::uint64_t notified_sequence = + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire); + while (notify_force_flush > notified_sequence) + { + synchronization_data->force_flush_notified_sequence.compare_exchange_strong( + notified_sequence, notify_force_flush, std::memory_order_acq_rel); + synchronization_data->force_flush_cv.notify_all(); + } } } @@ -246,7 +240,8 @@ void BatchLogRecordProcessor::DrainQueue() while (true) { if (buffer_.empty() && - false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <= + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { break; } diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 91f604195e..571897b811 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -89,6 +89,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() }); std::future_status status; + std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire); do { status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); @@ -98,11 +99,13 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() break; } } while (status != std::future_status::ready); - bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); - if (notify_force_flush) + + std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire); + while (notify_force_flush > notified_sequence) { - is_force_flush_notified_.store(true, std::memory_order_release); - force_flush_cv_.notify_one(); + force_flush_notified_sequence_.compare_exchange_strong(notified_sequence, notify_force_flush, + std::memory_order_acq_rel); + force_flush_cv_.notify_all(); } return true; @@ -111,20 +114,22 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept { std::unique_lock lk_cv(force_flush_m_); - is_force_flush_pending_.store(true, std::memory_order_release); - auto break_condition = [this]() { + std::uint64_t current_sequence = + force_flush_pending_sequence_.fetch_add(1, std::memory_order_release) + 1; + auto break_condition = [this, current_sequence]() { if (IsShutdown()) { return true; } // Wake up the worker thread once. - if (is_force_flush_pending_.load(std::memory_order_acquire)) + if (force_flush_pending_sequence_.load(std::memory_order_acquire) > + force_flush_notified_sequence_.load(std::memory_order_acquire)) { is_force_wakeup_background_worker_.store(true, std::memory_order_release); cv_.notify_one(); } - return is_force_flush_notified_.load(std::memory_order_acquire); + return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; }; auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( @@ -139,30 +144,14 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called - // between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait - // for ever + // When force_flush_notified_sequence_.compare_exchange_strong(...) and + // force_flush_cv_.notify_all() is called between force_flush_pending_sequence_.load(...) and + // force_flush_cv_.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - // If it will be already signaled, we must wait until notified. - // We use a spin lock here - if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) - { - for (int retry_waiting_times = 0; - false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times) - { - opentelemetry::common::SpinLockMutex::fast_yield(); - if ((retry_waiting_times & 127) == 127) - { - std::this_thread::yield(); - } - } - } - is_force_flush_notified_.store(false, std::memory_order_release); - if (result) { // - If original `timeout` is `zero`, use that in exporter::forceflush @@ -184,7 +173,8 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo result = false; } } - return result; + return result && + force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; } bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index d5b96f2cc8..c9cc5230ca 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -76,23 +76,27 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept // Now wait for the worker thread to signal back from the Export method std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); - synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + std::uint64_t current_sequence = + synchronization_data_->force_flush_pending_sequence.fetch_add(1, std::memory_order_release) + + 1; synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); - auto break_condition = [this]() { + auto break_condition = [this, current_sequence]() { if (synchronization_data_->is_shutdown.load() == true) { return true; } // Wake up the worker thread once. - if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); synchronization_data_->cv.notify_one(); } - return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; }; // Fix timeout to meet requirement of wait_for @@ -108,34 +112,17 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept bool result = false; while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever + // When force_flush_notified_sequence.compare_exchange_strong(...) and + // force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and + // force_flush_cv.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, break_condition); timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } - // If it will be already signaled, we must wait util notified. - // We use a spin lock here - if (false == - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel)) - { - for (int retry_waiting_times = 0; - false == synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire); - ++retry_waiting_times) - { - opentelemetry::common::SpinLockMutex::fast_yield(); - if ((retry_waiting_times & 127) == 127) - { - std::this_thread::yield(); - } - } - } - synchronization_data_->is_force_flush_notified.store(false, std::memory_order_release); - - return result; + return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= + current_sequence; } void BatchSpanProcessor::DoBackgroundWork() @@ -179,8 +166,8 @@ void BatchSpanProcessor::Export() { std::vector> spans_arr; size_t num_records_to_export; - bool notify_force_flush = - synchronization_data_->is_force_flush_pending.exchange(false, std::memory_order_acq_rel); + std::uint64_t notify_force_flush = + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire); if (notify_force_flush) { num_records_to_export = buffer_.size(); @@ -212,7 +199,7 @@ void BatchSpanProcessor::Export() } void BatchSpanProcessor::NotifyCompletion( - bool notify_force_flush, + std::uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { @@ -221,7 +208,8 @@ void BatchSpanProcessor::NotifyCompletion( return; } - if (notify_force_flush) + if (notify_force_flush > + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire)) { if (exporter) { @@ -232,8 +220,14 @@ void BatchSpanProcessor::NotifyCompletion( exporter->ForceFlush(timeout); } - synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); - synchronization_data->force_flush_cv.notify_one(); + std::uint64_t notified_sequence = + synchronization_data->force_flush_notified_sequence.load(std::memory_order_acquire); + while (notify_force_flush > notified_sequence) + { + synchronization_data->force_flush_notified_sequence.compare_exchange_strong( + notified_sequence, notify_force_flush, std::memory_order_acq_rel); + synchronization_data->force_flush_cv.notify_all(); + } } } @@ -242,7 +236,8 @@ void BatchSpanProcessor::DrainQueue() while (true) { if (buffer_.empty() && - false == synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire)) + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) <= + synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { break; } From cc9db7ba347c54ead1c371c9495b2f8e57c41bfd Mon Sep 17 00:00:00 2001 From: owent Date: Sun, 10 Mar 2024 18:01:10 +0800 Subject: [PATCH 2/5] Fix `force_flush_pending_sequence` in log batch processor --- sdk/src/logs/batch_log_record_processor.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index da80d5b90e..a108ed587a 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -168,8 +168,8 @@ void BatchLogRecordProcessor::Export() { std::vector> records_arr; size_t num_records_to_export; - std::uint64_t notify_force_flush = synchronization_data_->force_flush_pending_sequence.exchange( - false, std::memory_order_acq_rel); + std::uint64_t notify_force_flush = + synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire); if (notify_force_flush) { num_records_to_export = buffer_.size(); From 5ca613fd3dc02df270db348a309b56f81e430b01 Mon Sep 17 00:00:00 2001 From: owent Date: Tue, 12 Mar 2024 15:00:37 +0800 Subject: [PATCH 3/5] Fix ForceFlush may wait a longer time than the timeout argument. --- sdk/src/logs/batch_log_record_processor.cc | 9 +++++++-- .../export/periodic_exporting_metric_reader.cc | 13 ++++++++++--- sdk/src/trace/batch_span_processor.cc | 10 ++++++++-- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index a108ed587a..8257a3b935 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -118,8 +118,13 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex // force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and // force_flush_cv.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, - break_condition); + std::chrono::microseconds wait_timeout = scheduled_delay_millis_; + + if (wait_timeout > timeout_steady) + { + wait_timeout = std::chrono::duration_cast(timeout_steady); + } + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition); timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 571897b811..6bc612592a 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -132,8 +132,9 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; }; - auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( - timeout, std::chrono::microseconds::zero()); + std::chrono::microseconds wait_timeout = + opentelemetry::common::DurationUtil::AdjustWaitForTimeout(timeout, + std::chrono::microseconds::zero()); std::chrono::steady_clock::duration timeout_steady = std::chrono::duration_cast(wait_timeout); if (timeout_steady <= std::chrono::steady_clock::duration::zero()) @@ -148,7 +149,13 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo // force_flush_cv_.notify_all() is called between force_flush_pending_sequence_.load(...) and // force_flush_cv_.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); - result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); + + wait_timeout = export_interval_millis_; + if (wait_timeout > timeout_steady) + { + wait_timeout = std::chrono::duration_cast(timeout_steady); + } + result = force_flush_cv_.wait_for(lk_cv, wait_timeout, break_condition); timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index c9cc5230ca..329f3d4181 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -116,8 +116,14 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept // force_flush_cv.notify_all() is called between force_flush_pending_sequence.load(...) and // force_flush_cv.wait(). We must not wait for ever std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, - break_condition); + + std::chrono::microseconds wait_timeout = schedule_delay_millis_; + + if (wait_timeout > timeout_steady) + { + wait_timeout = std::chrono::duration_cast(timeout_steady); + } + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, wait_timeout, break_condition); timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } From e1946087b7d93776e06b4cfe2ac38367c7a87665 Mon Sep 17 00:00:00 2001 From: owent Date: Fri, 22 Mar 2024 19:54:37 +0800 Subject: [PATCH 4/5] Use `notify_all` to notify worker threads to avoid notification miss in critical section. --- sdk/src/logs/batch_log_record_processor.cc | 6 +++--- .../metrics/export/periodic_exporting_metric_reader.cc | 6 +++--- sdk/src/trace/batch_span_processor.cc | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index 8257a3b935..c29e27bdd3 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr &&record) noexc { // signal the worker thread synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } } @@ -93,7 +93,7 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= @@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce if (worker_thread_.joinable()) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); worker_thread_.join(); } diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 6bc612592a..2d07011a24 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -122,12 +122,12 @@ bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeo return true; } - // Wake up the worker thread once. + // Wake up the worker thread. if (force_flush_pending_sequence_.load(std::memory_order_acquire) > force_flush_notified_sequence_.load(std::memory_order_acquire)) { is_force_wakeup_background_worker_.store(true, std::memory_order_release); - cv_.notify_one(); + cv_.notify_all(); } return force_flush_notified_sequence_.load(std::memory_order_acquire) >= current_sequence; }; @@ -188,7 +188,7 @@ bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout { if (worker_thread_.joinable()) { - cv_.notify_one(); + cv_.notify_all(); worker_thread_.join(); } return exporter_->Shutdown(timeout); diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index 329f3d4181..3827fad495 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -62,7 +62,7 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept if (buffer_size >= max_queue_size_ / 2 || buffer_size >= max_export_batch_size_) { // signal the worker thread - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } } @@ -86,13 +86,13 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept return true; } - // Wake up the worker thread once. + // Wake up the worker thread. if (synchronization_data_->force_flush_pending_sequence.load(std::memory_order_acquire) > synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire)) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); } return synchronization_data_->force_flush_notified_sequence.load(std::memory_order_acquire) >= @@ -281,7 +281,7 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept if (worker_thread_.joinable()) { synchronization_data_->is_force_wakeup_background_worker.store(true, std::memory_order_release); - synchronization_data_->cv.notify_one(); + synchronization_data_->cv.notify_all(); worker_thread_.join(); } From cd7c21bfb4d1c8d99140781b98b9cc949cdedf1f Mon Sep 17 00:00:00 2001 From: owent Date: Wed, 8 May 2024 15:21:33 +0800 Subject: [PATCH 5/5] Using `uint64_t` instead of `std::uint64_t` --- .../opentelemetry/sdk/logs/batch_log_record_processor.h | 6 +++--- .../sdk/metrics/export/periodic_exporting_metric_reader.h | 4 ++-- sdk/include/opentelemetry/sdk/trace/batch_span_processor.h | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h index e6991a8bc2..01a39348e2 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h @@ -116,8 +116,8 @@ class BatchLogRecordProcessor : public LogRecordProcessor /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker{false}; std::atomic is_shutdown{false}; - std::atomic force_flush_pending_sequence{false}; - std::atomic force_flush_notified_sequence{false}; + std::atomic force_flush_pending_sequence{0}; + std::atomic force_flush_notified_sequence{0}; std::atomic force_flush_timeout_us{0}; // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs @@ -133,7 +133,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor * @param notify_force_flush Sequence to indicate whether to notify force flush completion. * @param synchronization_data Synchronization data to be notified. */ - static void NotifyCompletion(std::uint64_t notify_force_flush, + static void NotifyCompletion(uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index a91d23b994..9fb0fbf5a0 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -52,8 +52,8 @@ class PeriodicExportingMetricReader : public MetricReader /* Synchronization primitives */ std::atomic is_force_wakeup_background_worker_{false}; - std::atomic force_flush_pending_sequence_{0}; - std::atomic force_flush_notified_sequence_{0}; + std::atomic force_flush_pending_sequence_{0}; + std::atomic force_flush_notified_sequence_{0}; std::condition_variable cv_, force_flush_cv_; std::mutex cv_m_, force_flush_m_; }; diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index 40e706f7f5..e23b0a2df2 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -117,8 +117,8 @@ class BatchSpanProcessor : public SpanProcessor /* Important boolean flags to handle the workflow of the processor */ std::atomic is_force_wakeup_background_worker{false}; std::atomic is_shutdown{false}; - std::atomic force_flush_pending_sequence{0}; - std::atomic force_flush_notified_sequence{0}; + std::atomic force_flush_pending_sequence{0}; + std::atomic force_flush_notified_sequence{0}; std::atomic force_flush_timeout_us{0}; // Do not use SynchronizationData() = default; here, some versions of GCC&Clang have BUGs @@ -134,7 +134,7 @@ class BatchSpanProcessor : public SpanProcessor * @param notify_force_flush Sequence to indicate whether to notify force flush completion. * @param synchronization_data Synchronization data to be notified. */ - static void NotifyCompletion(std::uint64_t notify_force_flush, + static void NotifyCompletion(uint64_t notify_force_flush, const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data);