Skip to content

Commit

Permalink
Merge branch 'main' into version_proto
Browse files Browse the repository at this point in the history
  • Loading branch information
marcalff authored Aug 3, 2024
2 parents 5405302 + 34ca855 commit a74b322
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ if(WITH_OTLP_GRPC
if(WITH_OTLP_GRPC)
find_package(gRPC)
endif()
if((NOT Protobuf_FOUND AND NOT PROTOBUF_FOUND) OR (NOT gRPC_FOUND))
if((NOT Protobuf_FOUND AND NOT PROTOBUF_FOUND) OR (WITH_OTLP_GRPC
AND NOT gRPC_FOUND))
if(WIN32 AND (NOT DEFINED CMAKE_TOOLCHAIN_FILE))
install_windows_deps()
endif()
Expand Down
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

module(
name = "opentelemetry-cpp",
version = "0",
version = "1.16.1",
compatibility_level = 0,
repo_name = "io_opentelemetry_cpp",
)
Expand Down
122 changes: 72 additions & 50 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
#include <thread>
#include <utility>
#include <vector>
#if OPENTELEMETRY_HAVE_EXCEPTIONS
# include <exception>
#endif

#if !defined(__CYGWIN__) && defined(_WIN32)
# ifndef WIN32_LEAN_AND_MEAN
Expand Down Expand Up @@ -1424,71 +1427,90 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
return;
}

std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
#if OPENTELEMETRY_HAVE_EXCEPTIONS
try
{
return;
}

std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;
#endif

while (true)
std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
{
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}
return;
}

if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
break;
}
std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;

while (true)
{
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}

{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
break;
}

if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}

concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}
{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
}

concurrency_file->background_thread_waiter_cv.notify_all();
}
if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}

concurrency_file->background_thread_waiter_cv.notify_all();
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
#if OPENTELEMETRY_HAVE_EXCEPTIONS
}
catch (std::exception &e)
{
OTEL_INTERNAL_LOG_WARN("[OTLP FILE Client] Try to spawn background but got a exception: "
<< e.what() << ".Data writing may experience some delays.");
}
catch (...)
{
OTEL_INTERNAL_LOG_WARN(
"[OTLP FILE Client] Try to spawn background but got a unknown exception.Data writing may "
"experience some delays.");
}
#endif
}

private:
Expand Down

0 comments on commit a74b322

Please sign in to comment.