Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cocurrency otlp http session #1274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Increment the:
## [Unreleased]

* [EXPORTER] Jaeger Exporter - Populate Span Links ([#1251](https:/open-telemetry/opentelemetry-cpp/pull/1251))
* [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https:/open-telemetry/opentelemetry-cpp/pull/1209))

## [1.2.0] 2022-01-31

Expand Down
34 changes: 34 additions & 0 deletions api/include/opentelemetry/common/timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,39 @@ class SteadyTimestamp
private:
int64_t nanos_since_epoch_;
};

class DurationUtil
{
public:
template <class Rep, class Period>
static std::chrono::duration<Rep, Period> AdjustWaitForTimeout(
std::chrono::duration<Rep, Period> timeout,
std::chrono::duration<Rep, Period> indefinite_value) noexcept
{
// Do not call now() when this duration is max value, now() may have a expensive cost.
if (timeout == std::chrono::duration<Rep, Period>::max())
{
return indefinite_value;
}

// std::future<T>::wait_for, std::this_thread::sleep_for, and std::condition_variable::wait_for
// may use steady_clock or system_clock.We need make sure now() + timeout do not overflow.
auto max_timeout = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now());
if (timeout >= max_timeout)
{
return indefinite_value;
}
max_timeout = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
std::chrono::system_clock::time_point::max() - std::chrono::system_clock::now());
if (timeout >= max_timeout)
{
return indefinite_value;
}

return timeout;
}
};

} // namespace common
OPENTELEMETRY_END_NAMESPACE
2 changes: 1 addition & 1 deletion exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

#include "opentelemetry/exporters/otlp/otlp_environment.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -71,20 +76,25 @@ struct OtlpHttpClientOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;

inline OtlpHttpClientOptions(nostd::string_view input_url,
HttpRequestContentType input_content_type,
JsonBytesMappingKind input_json_bytes_mapping,
bool input_use_json_name,
bool input_console_debug,
std::chrono::system_clock::duration input_timeout,
const OtlpHeaders &input_http_headers)
const OtlpHeaders &input_http_headers,
std::size_t input_concurrent_sessions = 8)
: url(input_url),
content_type(input_content_type),
json_bytes_mapping(input_json_bytes_mapping),
use_json_name(input_use_json_name),
console_debug(input_console_debug),
timeout(input_timeout),
http_headers(input_http_headers)
http_headers(input_http_headers),
concurrent_sessions(input_concurrent_sessions)
{}
};

Expand All @@ -99,6 +109,8 @@ class OtlpHttpClient
*/
explicit OtlpHttpClient(OtlpHttpClientOptions &&options);

~OtlpHttpClient();

/**
* Export
* @param message message to export, it should be ExportTraceServiceRequest,
Expand All @@ -114,19 +126,33 @@ class OtlpHttpClient
*/
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept;

/**
* @brief Release the lifetime of specify session.
*
* @param session the session to release
*/
void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept;

private:
// Stores if this HTTP client had its Shutdown() method called
bool is_shutdown_ = false;
/**
* Add http session and hold it's lifetime.
* @param session the session to add
* @param event_handle the event handle of this session
*/
void addSession(
std::shared_ptr<opentelemetry::ext::http::client::Session> session,
std::unique_ptr<opentelemetry::ext::http::client::EventHandler> &&event_handle) noexcept;

// The configuration options associated with this HTTP client.
const OtlpHttpClientOptions options_;
/**
* @brief Real delete all sessions and event handles.
* @note This function is called in the same thread where we create sessions and handles
*
* @return return true if there are more sessions to delete
*/
bool cleanupGCSessions() noexcept;

// Object that stores the HTTP sessions that have been created
std::shared_ptr<ext::http::client::HttpClient> http_client_;
// Cached parsed URI
std::string http_uri_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;

// For testing
friend class OtlpHttpExporterTestPeer;
friend class OtlpHttpLogExporterTestPeer;
Expand All @@ -138,6 +164,51 @@ class OtlpHttpClient
*/
OtlpHttpClient(OtlpHttpClientOptions &&options,
std::shared_ptr<ext::http::client::HttpClient> http_client);

struct HttpSessionData
{
std::shared_ptr<opentelemetry::ext::http::client::Session> session;
std::unique_ptr<opentelemetry::ext::http::client::EventHandler> event_handle;

inline HttpSessionData() = default;

inline explicit HttpSessionData(
std::shared_ptr<opentelemetry::ext::http::client::Session> &&input_session,
std::unique_ptr<opentelemetry::ext::http::client::EventHandler> &&input_handle)
{
session.swap(input_session);
event_handle.swap(input_handle);
}

inline explicit HttpSessionData(HttpSessionData &&other)
{
session.swap(other.session);
event_handle.swap(other.event_handle);
}
};

// Stores if this HTTP client had its Shutdown() method called
bool is_shutdown_;

// The configuration options associated with this HTTP client.
const OtlpHttpClientOptions options_;

// Object that stores the HTTP sessions that have been created
std::shared_ptr<ext::http::client::HttpClient> http_client_;

// Cached parsed URI
std::string http_uri_;

// Running sessions and event handles
std::unordered_map<const opentelemetry::ext::http::client::Session *, HttpSessionData>
running_sessions_;
// Sessions and event handles that are waiting to be deleted
std::list<HttpSessionData> gc_sessions_;
// Lock for running_sessions_, gc_sessions_ and http_client_
std::recursive_mutex session_manager_lock_;
// Condition variable and mutex to control the concurrency count of running sessions
std::mutex session_waker_lock_;
std::condition_variable session_waker_;
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "opentelemetry/exporters/otlp/otlp_environment.h"

#include <chrono>
#include <cstddef>
#include <memory>
#include <string>

Expand Down Expand Up @@ -50,6 +51,9 @@ struct OtlpHttpExporterOptions

// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# include "opentelemetry/exporters/otlp/otlp_environment.h"

# include <chrono>
# include <cstddef>
# include <memory>
# include <string>

Expand Down Expand Up @@ -50,6 +51,9 @@ struct OtlpHttpLogExporterOptions

// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultLogHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
};

/**
Expand Down
Loading