Skip to content

Commit

Permalink
+ Fix condition variables and notifications when ForceFlush.
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Mar 24, 2022
1 parent 075bc27 commit 28d08e9
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 198 deletions.
37 changes: 21 additions & 16 deletions api/include/opentelemetry/common/spin_lock_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ class SpinLockMutex
SpinLockMutex &operator=(const SpinLockMutex &) = delete;
SpinLockMutex &operator=(const SpinLockMutex &) volatile = delete;

static inline void fast_yield() noexcept
{
// Issue a Pause/Yield instruction while spinning.
#if defined(_MSC_VER)
YieldProcessor();
#elif defined(__i386__) || defined(__x86_64__)
# if defined(__clang__)
_mm_pause();
# else
__builtin_ia32_pause();
# endif
#elif defined(__arm__)
// This intrinsic should fail to be found if YIELD is not supported on the current
// processor.
__yield();
#else
// TODO: Issue PAGE/YIELD on other architectures.
#endif
}

/**
* Attempts to lock the mutex. Return immediately with `true` (success) or `false` (failure).
*/
Expand Down Expand Up @@ -91,22 +111,7 @@ class SpinLockMutex
{
return;
}
// Issue a Pause/Yield instruction while spinning.
#if defined(_MSC_VER)
YieldProcessor();
#elif defined(__i386__) || defined(__x86_64__)
# if defined(__clang__)
_mm_pause();
# else
__builtin_ia32_pause();
# endif
#elif defined(__arm__)
// This intrinsic should fail to be found if YIELD is not supported on the current
// processor.
__yield();
#else
// TODO: Issue PAGE/YIELD on other architectures.
#endif
fast_yield();
}
// Yield then try again (goal ~100ns)
std::this_thread::yield();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ struct OtlpHttpClientOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
// Concurrent requests
std::size_t max_concurrent_requests = 8;

inline OtlpHttpClientOptions(nostd::string_view input_url,
HttpRequestContentType input_content_type,
Expand All @@ -96,7 +96,7 @@ struct OtlpHttpClientOptions
console_debug(input_console_debug),
timeout(input_timeout),
http_headers(input_http_headers),
concurrent_sessions(input_concurrent_sessions)
max_concurrent_requests(input_concurrent_sessions)
{}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ struct OtlpHttpExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
// Concurrent requests
// https:/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
std::size_t max_concurrent_requests = 8;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ struct OtlpHttpLogExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultLogHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
// Concurrent requests
// https:/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
std::size_t max_concurrent_requests = 8;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ void OtlpHttpClient::Export(
std::unique_lock<std::mutex> lock(session_waker_lock_);
session_waker_.wait_for(lock, options_.timeout, [this] {
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
return running_sessions_.size() <= options_.concurrent_sessions;
return running_sessions_.size() <= options_.max_concurrent_requests;
});

cleanupGCSessions();
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options)
options.console_debug,
options.timeout,
options.http_headers,
options.concurrent_sessions)))
options.max_concurrent_requests)))
{}

OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr<OtlpHttpClient> http_client)
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_http_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio
options.console_debug,
options.timeout,
options.http_headers,
options.concurrent_sessions)))
options.max_concurrent_requests)))
{}

OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr<OtlpHttpClient> http_client)
Expand Down
20 changes: 9 additions & 11 deletions sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# include <atomic>
# include <condition_variable>
# include <cstdint>
# include <memory>
# include <thread>

Expand Down Expand Up @@ -124,12 +125,8 @@ class BatchLogProcessor : public LogProcessor
/**
* Exports all logs to the configured exporter.
*
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, then we have to
* notify the main thread to wake it up in the ForceFlush
* method.
*/
void Export(const bool was_for_flush_called);
void Export();

/**
* Called when Shutdown() is invoked. Completely drains the queue of all log records and
Expand All @@ -143,14 +140,15 @@ class BatchLogProcessor : public LogProcessor
struct SynchronizationData
{
/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_, async_shutdown_cv_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_, async_shutdown_m_;
std::condition_variable cv, force_flush_cv, async_shutdown_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_;
std::atomic<bool> is_force_flush_;
std::atomic<bool> is_force_flush_notified_;
std::atomic<bool> is_async_shutdown_notified_;
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_async_shutdown_notified;
};

/**
Expand Down
19 changes: 8 additions & 11 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,8 @@ class BatchSpanProcessor : public SpanProcessor
/**
* Exports all ended spans to the configured exporter.
*
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, then we have to
* notify the main thread to wake it up in the ForceFlush
* method.
*/
void Export(const bool was_for_flush_called);
void Export();

/**
* Called when Shutdown() is invoked. Completely drains the queue of all its ended spans and
Expand All @@ -141,14 +137,15 @@ class BatchSpanProcessor : public SpanProcessor
struct SynchronizationData
{
/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_, async_shutdown_cv_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_, async_shutdown_m_;
std::condition_variable cv, force_flush_cv, async_shutdown_cv;
std::mutex cv_m, force_flush_cv_m, shutdown_m, async_shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_;
std::atomic<bool> is_force_flush_;
std::atomic<bool> is_force_flush_notified_;
std::atomic<bool> is_async_shutdown_notified_;
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_async_shutdown_notified;
};

/**
Expand Down
Loading

0 comments on commit 28d08e9

Please sign in to comment.