Skip to content

Commit

Permalink
Add ForceFlush for all LogRecordExporters and SpanExporters. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Mar 20, 2023
1 parent 3a09d53 commit 9b222f2
Show file tree
Hide file tree
Showing 46 changed files with 680 additions and 116 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Increment the:
[#2060](https:/open-telemetry/opentelemetry-cpp/pull/2060)
* [BUILD] Restore detfault value of `OPENTELEMETRY_INSTALL` to `ON` when it's on
top level.[#2062](https:/open-telemetry/opentelemetry-cpp/pull/2062)
* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
[#2000](https:/open-telemetry/opentelemetry-cpp/pull/2000)

Important changes:

Expand All @@ -44,6 +46,12 @@ Important changes:
* As a result, a behavior change for GRPC SSL is possible,
because the endpoint scheme now takes precedence.
Please verify configuration settings for the GRPC endpoint.
* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
[#2000](https:/open-telemetry/opentelemetry-cpp/pull/2000)
* `LogRecordExporter` and `SpanExporter` add a new virtual function
`ForceFlush`, and if users implement any customized `LogRecordExporter` and
`SpanExporter`, they should also implement this function.There should be no
influence if users only use factory to create exporters.

## [1.8.3] 2023-03-06

Expand Down
21 changes: 21 additions & 0 deletions examples/otlp/grpc_log_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
# include "opentelemetry/trace/provider.h"

// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
// running exportings when destroy and shutdown exporters.It's optional to users.
# include "opentelemetry/sdk/logs/logger_provider.h"
# include "opentelemetry/sdk/trace/tracer_provider.h"

# include <string>

# ifdef BAZEL_BUILD
Expand Down Expand Up @@ -42,6 +47,14 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
Expand All @@ -59,6 +72,14 @@ void InitLogger()

void CleanupLogger()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<logs::LoggerProvider> provider =
logs::Provider::GetLoggerProvider();
if (provider)
{
static_cast<logs_sdk::LoggerProvider *>(provider.get())->ForceFlush();
}

nostd::shared_ptr<logs::LoggerProvider> none;
opentelemetry::logs::Provider::SetLoggerProvider(none);
}
Expand Down
12 changes: 12 additions & 0 deletions examples/otlp/grpc_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"

// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
// destroy and shutdown exporters.It's optional to users.
#include "opentelemetry/sdk/trace/tracer_provider.h"

#ifdef BAZEL_BUILD
# include "examples/common/foo_library/foo_library.h"
#else
Expand All @@ -32,6 +36,14 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
Expand Down
53 changes: 46 additions & 7 deletions examples/otlp/http_log_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
# include "opentelemetry/trace/provider.h"

// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
// running exportings when destroy and shutdown exporters.It's optional to users.
# include "opentelemetry/sdk/logs/logger_provider.h"
# include "opentelemetry/sdk/trace/tracer_provider.h"

# include <iostream>
# include <string>

# ifdef BAZEL_BUILD
Expand All @@ -32,11 +38,27 @@ namespace internal_log = opentelemetry::sdk::common::internal_log;
namespace
{

opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts;
opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts;
void InitTracer()
{
if (trace_opts.url.size() > 9)
{
if (trace_opts.url.substr(trace_opts.url.size() - 8) == "/v1/logs")
{
trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 8) + "/v1/traces";
}
else if (trace_opts.url.substr(trace_opts.url.size() - 9) == "/v1/logs/")
{
trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 9) + "/v1/traces";
}
else
{
trace_opts.url = opentelemetry::exporter::otlp::GetOtlpDefaultHttpTracesEndpoint();
}
}
std::cout << "Using " << trace_opts.url << " to export trace spans." << std::endl;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpExporterFactory::Create(opts);
auto exporter = otlp::OtlpHttpExporterFactory::Create(trace_opts);
auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter));
std::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace_sdk::TracerProviderFactory::Create(std::move(processor));
Expand All @@ -46,13 +68,22 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}

opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts;
void InitLogger()
{
std::cout << "Using " << logger_opts.url << " to export log records." << std::endl;
logger_opts.console_debug = true;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts);
Expand All @@ -65,6 +96,14 @@ void InitLogger()

void CleanupLogger()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<logs::LoggerProvider> provider =
logs::Provider::GetLoggerProvider();
if (provider)
{
static_cast<logs_sdk::LoggerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<logs::LoggerProvider> none;
opentelemetry::logs::Provider::SetLoggerProvider(none);
}
Expand All @@ -83,26 +122,26 @@ int main(int argc, char *argv[])
{
if (argc > 1)
{
opts.url = argv[1];
trace_opts.url = argv[1];
logger_opts.url = argv[1];
if (argc > 2)
{
std::string debug = argv[2];
opts.console_debug = debug != "" && debug != "0" && debug != "no";
std::string debug = argv[2];
trace_opts.console_debug = debug != "" && debug != "0" && debug != "no";
}

if (argc > 3)
{
std::string binary_mode = argv[3];
if (binary_mode.size() >= 3 && binary_mode.substr(0, 3) == "bin")
{
opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
trace_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
logger_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
}
}
}

if (opts.console_debug)
if (trace_opts.console_debug)
{
internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Debug);
}
Expand Down
12 changes: 12 additions & 0 deletions examples/otlp/http_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"

// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
// destroy and shutdown exporters.It's optional to users.
#include "opentelemetry/sdk/trace/tracer_provider.h"

#include <string>

#ifdef BAZEL_BUILD
Expand Down Expand Up @@ -38,6 +42,14 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
# include "nlohmann/json.hpp"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/ext/http/client/http_client_factory.h"
# include "opentelemetry/nostd/shared_ptr.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/recordable.h"

# include <time.h>
# include <atomic>
# include <condition_variable>
# include <cstddef>
# include <iostream>
# include <mutex>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -90,6 +95,14 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

/**
* Force flush the exporter.
* @param timeout an option timeout, default to max.
* @return return true when all data are exported, and false when timeout
*/
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
Expand All @@ -108,6 +121,18 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
std::shared_ptr<ext::http::client::HttpClient> http_client_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;

# ifdef ENABLE_ASYNC_EXPORT
struct SynchronizationData
{
std::atomic<std::size_t> session_counter_;
std::atomic<std::size_t> finished_session_counter_;
std::condition_variable force_flush_cv;
std::mutex force_flush_cv_m;
std::recursive_mutex force_flush_m;
};
nostd::shared_ptr<SynchronizationData> synchronization_data_;
# endif
};
} // namespace logs
} // namespace exporter
Expand Down
70 changes: 64 additions & 6 deletions exporters/elasticsearch/src/es_log_record_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,19 @@ class AsyncResponseHandler : public http_client::EventHandler
# endif

ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter()
: options_{ElasticsearchExporterOptions()},
http_client_{ext::http::client::HttpClientFactory::Create()}
{}
: options_{ElasticsearchExporterOptions()}, http_client_
{
ext::http::client::HttpClientFactory::Create()
}
# ifdef ENABLE_ASYNC_EXPORT
, synchronization_data_(new SynchronizationData())
# endif
{
# ifdef ENABLE_ASYNC_EXPORT
synchronization_data_->finished_session_counter_.store(0);
synchronization_data_->session_counter_.store(0);
# endif
}

ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter(
const ElasticsearchExporterOptions &options)
Expand Down Expand Up @@ -343,10 +353,12 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(

# ifdef ENABLE_ASYNC_EXPORT
// Send the request
std::size_t span_count = records.size();
auto handler = std::make_shared<AsyncResponseHandler>(
synchronization_data_->session_counter_.fetch_add(1, std::memory_order_release);
std::size_t span_count = records.size();
auto synchronization_data = synchronization_data_;
auto handler = std::make_shared<AsyncResponseHandler>(
session,
[span_count](opentelemetry::sdk::common::ExportResult result) {
[span_count, synchronization_data](opentelemetry::sdk::common::ExportResult result) {
if (result != opentelemetry::sdk::common::ExportResult::kSuccess)
{
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] ERROR: Export "
Expand All @@ -358,6 +370,9 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
OTEL_INTERNAL_LOG_DEBUG("[ES Log Exporter] Export " << span_count
<< " trace span(s) success");
}

synchronization_data->finished_session_counter_.fetch_add(1, std::memory_order_release);
synchronization_data->force_flush_cv.notify_all();
return true;
},
options_.console_debug_);
Expand Down Expand Up @@ -401,6 +416,49 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
# endif
}

bool ElasticsearchLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept
{
# ifdef ENABLE_ASYNC_EXPORT
std::lock_guard<std::recursive_mutex> lock_guard{synchronization_data_->force_flush_m};
std::size_t running_counter =
synchronization_data_->session_counter_.load(std::memory_order_acquire);
// ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented
// in type 'long int' here. So we reset timeout to meet signed long int limit here.
timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());

std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
timeout_steady = std::chrono::steady_clock::duration::max();
}

std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);
// Wait for all the sessions to finish
while (timeout_steady > std::chrono::steady_clock::duration::zero())
{
if (synchronization_data_->finished_session_counter_.load(std::memory_order_acquire) >=
running_counter)
{
break;
}

std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
if (std::cv_status::no_timeout != synchronization_data_->force_flush_cv.wait_for(
lk_cv, std::chrono::seconds{options_.response_timeout_}))
{
break;
}
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

return timeout_steady > std::chrono::steady_clock::duration::zero();
# else
return true;
# endif
}

bool ElasticsearchLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
8 changes: 6 additions & 2 deletions exporters/etw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ target_include_directories(
set_target_properties(opentelemetry_exporter_etw PROPERTIES EXPORT_NAME
etw_exporter)

target_link_libraries(opentelemetry_exporter_etw
INTERFACE opentelemetry_api nlohmann_json::nlohmann_json)
target_link_libraries(
opentelemetry_exporter_etw INTERFACE opentelemetry_api opentelemetry_trace
nlohmann_json::nlohmann_json)
if(WITH_LOGS_PREVIEW)
target_link_libraries(opentelemetry_exporter_etw INTERFACE opentelemetry_logs)
endif()
if(nlohmann_json_clone)
add_dependencies(opentelemetry_exporter_etw nlohmann_json::nlohmann_json)
endif()
Expand Down
Loading

0 comments on commit 9b222f2

Please sign in to comment.