Skip to content

Commit

Permalink
Fix processor and otlp http client may wait for ever when run under a…
Browse files Browse the repository at this point in the history
… special sequence which is very hard to happen

Signed-off-by: owent <[email protected]>
  • Loading branch information
owent committed Mar 24, 2022
1 parent 28d08e9 commit f9282d8
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 15 deletions.
36 changes: 27 additions & 9 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,19 @@ OtlpHttpClient::~OtlpHttpClient()

// Wait for all the sessions to finish
std::unique_lock<std::mutex> lock(session_waker_lock_);
session_waker_.wait(lock, [this] {
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
return running_sessions_.empty();
});
while (true)
{
{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
if (running_sessions_.empty())
{
break;
}
}
// When changes of running_sessions_ and notify_one/notify_all happen between predicate
// checking and waiting, we should not wait forever.
session_waker_.wait_for(lock, options_.timeout);
}

// And then remove all session datas
while (cleanupGCSessions())
Expand Down Expand Up @@ -707,7 +716,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export(
std::unique_lock<std::mutex> lock(session_waker_lock_);
bool wait_successful = session_waker_.wait_for(lock, options_.timeout, [this] {
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
return running_sessions_.size() <= 0;
return running_sessions_.empty();
});

cleanupGCSessions();
Expand Down Expand Up @@ -777,10 +786,19 @@ bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept
std::unique_lock<std::mutex> lock(session_waker_lock_);
if (timeout <= std::chrono::microseconds::zero())
{
session_waker_.wait(lock, [this] {
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
return running_sessions_.empty();
});
while (true)
{
{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
if (running_sessions_.empty())
{
break;
}
}
// When changes of running_sessions_ and notify_one/notify_all happen between predicate
// checking and waiting, we should not wait forever.
session_waker_.wait_for(lock, options_.timeout);
}
}
else
{
Expand Down
22 changes: 19 additions & 3 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool result;
if (timeout <= std::chrono::microseconds::zero())
{
synchronization_data_->force_flush_cv.wait(lk_cv, break_condition);
bool wait_result = false;
while (!wait_result)
{
// When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
// between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
// for ever
wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_,
break_condition);
}
result = true;
}
else
Expand Down Expand Up @@ -245,9 +253,17 @@ void BatchLogProcessor::WaitForShutdownCompletion()
if (is_export_async_)
{
std::unique_lock<std::mutex> lk(synchronization_data_->async_shutdown_m);
while (synchronization_data_->is_async_shutdown_notified.load() == false)
while (true)
{
synchronization_data_->async_shutdown_cv.wait(lk);
if (synchronization_data_->is_async_shutdown_notified.load())
{
break;
}

// When is_async_shutdown_notified.store(true) and async_shutdown_cv.notify_all() is called
// between is_async_shutdown_notified.load() and async_shutdown_cv.wait(). We must not wait
// for ever
synchronization_data_->async_shutdown_cv.wait_for(lk, scheduled_delay_millis_);
}
}
}
Expand Down
22 changes: 19 additions & 3 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,15 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool result;
if (timeout <= std::chrono::microseconds::zero())
{
synchronization_data_->force_flush_cv.wait(lk_cv, break_condition);
bool wait_result = false;
while (!wait_result)
{
// When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
// between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
// for ever
wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_,
break_condition);
}
result = true;
}
else
Expand Down Expand Up @@ -232,9 +240,17 @@ void BatchSpanProcessor::WaitForShutdownCompletion()
if (is_export_async_)
{
std::unique_lock<std::mutex> lk(synchronization_data_->async_shutdown_m);
while (synchronization_data_->is_async_shutdown_notified.load() == false)
while (true)
{
synchronization_data_->async_shutdown_cv.wait(lk);
if (synchronization_data_->is_async_shutdown_notified.load())
{
break;
}

// When is_async_shutdown_notified.store(true) and async_shutdown_cv.notify_all() is called
// between is_async_shutdown_notified.load() and async_shutdown_cv.wait(). We must not wait
// for ever
synchronization_data_->async_shutdown_cv.wait_for(lk, schedule_delay_millis_);
}
}
}
Expand Down

0 comments on commit f9282d8

Please sign in to comment.