Skip to content

Commit

Permalink
Fix thread-safety problem when destroying OtlpHttpClient
Browse files Browse the repository at this point in the history
Signed-off-by: owentou <[email protected]>
  • Loading branch information
owent committed Feb 14, 2022
1 parent 535304f commit 1e8e9dd
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class OtlpHttpClient
*/
explicit OtlpHttpClient(OtlpHttpClientOptions &&options);

~OtlpHttpClient();

/**
* Export
* @param message message to export, it should be ExportTraceServiceRequest,
Expand Down Expand Up @@ -145,13 +147,14 @@ class OtlpHttpClient
* @brief Real delete all sessions and event handles.
* @note This function is called in the same thread where we create sessions and handles
*
* @return return true if there are more sessions to delete
*/
void cleanupGCSessions() noexcept;
bool cleanupGCSessions() noexcept;

bool isShutdown() const noexcept;

// Stores if this HTTP client had its Shutdown() method called
std::atomic<bool> is_shutdown_;
bool is_shutdown_;

// The configuration options associated with this HTTP client.
const OtlpHttpClientOptions options_;
Expand Down Expand Up @@ -182,10 +185,15 @@ class OtlpHttpClient
event_handle.swap(other.event_handle);
}
};

// Running sessions and event handles
std::unordered_map<const opentelemetry::ext::http::client::Session *, HttpSessionData>
running_sessions_;
// Sessions and event handles that are waiting to be deleted
std::list<HttpSessionData> gc_sessions_;
// Lock for running_sessions_, gc_sessions_ and http_client_
std::recursive_mutex session_manager_lock_;
// Condition variable and mutex to control the concurrency count of running sessions
std::mutex session_waker_lock_;
std::condition_variable session_waker_;
};
Expand Down
138 changes: 84 additions & 54 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class ResponseHandler : public http_client::EventHandler
{
stoping_.store(false);
}
virtual ~ResponseHandler() { Unbind(); }

/**
* Automatically called when the response is received, store the body into a string and notify any
Expand Down Expand Up @@ -589,28 +588,33 @@ void ConvertListFieldToJson(nlohmann::json &value,
} // namespace

OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options)
: options_(options), http_client_(http_client::HttpClientFactory::Create())
: is_shutdown_(false), options_(options), http_client_(http_client::HttpClientFactory::Create())
{}

OtlpHttpClient::~OtlpHttpClient()
{
is_shutdown_.store(false);
if (!isShutdown())
{
Shutdown(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now()));
}

// Wait for all the sessions to finish
std::unique_lock<std::mutex> lock(session_waker_lock_);
session_waker_.wait(lock, [this] {
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
return running_sessions_.empty();
});

// And then remove all session datas
while (cleanupGCSessions())
;
}

// ----------------------------- HTTP Client methods ------------------------------
opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export(
const google::protobuf::Message &message) noexcept
{
// Return failure if this exporter has been shutdown
if (isShutdown())
{
const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown";
if (options_.console_debug)
{
std::cerr << error_message << std::endl;
}
OTEL_INTERNAL_LOG_ERROR(error_message);

return opentelemetry::sdk::common::ExportResult::kFailure;
}

// Parse uri and store it to cache
if (http_uri_.empty())
{
Expand Down Expand Up @@ -678,22 +682,38 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export(
}

// Send the request
auto session = http_client_->CreateSession(options_.url);
auto request = session->CreateRequest();

for (auto &header : options_.http_headers)
{
request->AddHeader(header.first, header.second);
}
request->SetUri(http_uri_);
request->SetTimeoutMs(std::chrono::duration_cast<std::chrono::milliseconds>(options_.timeout));
request->SetMethod(http_client::Method::Post);
request->SetBody(body_vec);
request->ReplaceHeader("Content-Type", content_type);
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
// Return failure if this exporter has been shutdown
if (isShutdown())
{
const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown";
if (options_.console_debug)
{
std::cerr << error_message << std::endl;
}
OTEL_INTERNAL_LOG_ERROR(error_message);

// Send the request
addSession(std::move(session), std::unique_ptr<opentelemetry::ext::http::client::EventHandler>{
new ResponseHandler(options_.console_debug)});
return opentelemetry::sdk::common::ExportResult::kFailure;
}

auto session = http_client_->CreateSession(options_.url);
auto request = session->CreateRequest();

for (auto &header : options_.http_headers)
{
request->AddHeader(header.first, header.second);
}
request->SetUri(http_uri_);
request->SetTimeoutMs(std::chrono::duration_cast<std::chrono::milliseconds>(options_.timeout));
request->SetMethod(http_client::Method::Post);
request->SetBody(body_vec);
request->ReplaceHeader("Content-Type", content_type);

// Send the request
addSession(std::move(session), std::unique_ptr<opentelemetry::ext::http::client::EventHandler>{
new ResponseHandler(options_.console_debug)});
}

// Wait for the response to be received
if (options_.console_debug)
Expand All @@ -712,24 +732,27 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export(
return running_sessions_.size() <= options_.concurrency_sessions;
});

cleanupGCSessions();

// If an error occurred with the HTTP request
if (!wait_successful)
{
cleanupGCSessions();
return opentelemetry::sdk::common::ExportResult::kFailure;
}

cleanupGCSessions();
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true, std::memory_order_release);
{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
is_shutdown_ = true;

// Shutdown the session manager
http_client_->CancelAllSessions();
http_client_->FinishAllSessions();
// Shutdown the session manager
http_client_->CancelAllSessions();
http_client_->FinishAllSessions();
}

// Wait for all the sessions to finish
std::unique_lock<std::mutex> lock(session_waker_lock_);
Expand All @@ -738,25 +761,32 @@ bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept
return running_sessions_.empty();
});

while (!gc_sessions_.empty())
{
cleanupGCSessions();
}
while (cleanupGCSessions())
;
return true;
}

void OtlpHttpClient::ReleaseSession(
const opentelemetry::ext::http::client::Session &session) noexcept
{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
bool has_session = false;

auto seesion_iter = running_sessions_.find(&session);
if (seesion_iter != running_sessions_.end())
{
// Move session and handle into gc list, and they will be destroyed later
gc_sessions_.emplace_back(std::move(seesion_iter->second));
running_sessions_.erase(seesion_iter);
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};

auto seesion_iter = running_sessions_.find(&session);
if (seesion_iter != running_sessions_.end())
{
// Move session and handle into gc list, and they will be destroyed later
gc_sessions_.emplace_back(std::move(seesion_iter->second));
running_sessions_.erase(seesion_iter);

has_session = true;
}
}

if (has_session)
{
session_waker_.notify_all();
}
}
Expand All @@ -775,19 +805,17 @@ void OtlpHttpClient::addSession(

handle->Bind(this, *key);

{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
HttpSessionData& session_data = running_sessions_[key];
session_data.session.swap(session);
session_data.event_handle.swap(event_handle);
}
HttpSessionData &session_data = running_sessions_[key];
session_data.session.swap(session);
session_data.event_handle.swap(event_handle);

// Send request after the session is added
key->SendRequest(*handle);
}

void OtlpHttpClient::cleanupGCSessions() noexcept
bool OtlpHttpClient::cleanupGCSessions() noexcept
{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
std::list<HttpSessionData> gc_sessions;
gc_sessions_.swap(gc_sessions);

Expand All @@ -799,11 +827,13 @@ void OtlpHttpClient::cleanupGCSessions() noexcept
session_data.session->FinishSession();
}
}

return !gc_sessions_.empty();
}

bool OtlpHttpClient::isShutdown() const noexcept
{
return is_shutdown_.load(std::memory_order_acquire);
return is_shutdown_;
}

} // namespace otlp
Expand Down
20 changes: 16 additions & 4 deletions ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,30 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

bool CancelAllSessions() noexcept override
{
for (auto &session : sessions_)
// CancelSession may change sessions_, we can not change a container while iterating it.
while (!sessions_.empty())
{
session.second->CancelSession();
std::map<uint64_t, std::shared_ptr<Session>> sessions;
sessions.swap(sessions_);
for (auto &session : sessions)
{
session.second->CancelSession();
}
}
return true;
}

bool FinishAllSessions() noexcept override
{
for (auto &session : sessions_)
// FinishSession may change sessions_, we can not change a container while iterating it.
while (!sessions_.empty())
{
session.second->FinishSession();
std::map<uint64_t, std::shared_ptr<Session>> sessions;
sessions.swap(sessions_);
for (auto &session : sessions)
{
session.second->FinishSession();
}
}
return true;
}
Expand Down

0 comments on commit 1e8e9dd

Please sign in to comment.