Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Fix crash in PeriodicExportingMetricReader. #2983

Merged
merged 16 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions api/include/opentelemetry/common/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,37 @@ point.

#endif

// OPENTELEMETRY_HAVE_EXCEPTIONS
//
// Checks whether the compiler both supports and enables exceptions. Many
// compilers support a "no exceptions" mode that disables exceptions.
//
// Generally, when OPENTELEMETRY_HAVE_EXCEPTIONS is not defined:
//
// * Code using `throw` and `try` may not compile.
// * The `noexcept` specifier will still compile and behave as normal.
// * The `noexcept` operator may still return `false`.
//
// For further details, consult the compiler's documentation.
#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS
# if defined(__clang__) && ((__clang_major__ * 100) + __clang_minor__) < 306
// Clang < 3.6
// http://releases.llvm.org/3.6.0/tools/clang/docs/ReleaseNotes.html#the-exceptions-macro
# if defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions)
# define OPENTELEMETRY_HAVE_EXCEPTIONS 1
# endif // defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions)
# elif OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions)
# define OPENTELEMETRY_HAVE_EXCEPTIONS 1
// Handle remaining special cases and default to exceptions being supported.
# elif !(defined(__GNUC__) && !defined(__EXCEPTIONS) && !defined(__cpp_exceptions)) && \
!(defined(_MSC_VER) && !defined(_CPPUNWIND))
# define OPENTELEMETRY_HAVE_EXCEPTIONS 1
# endif
#endif
#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS
# define OPENTELEMETRY_HAVE_EXCEPTIONS 0
#endif

/*
OPENTELEMETRY_ATTRIBUTE_LIFETIME_BOUND indicates that a resource owned by a function
parameter or implicit object parameter is retained by the return value of the
Expand Down
1 change: 1 addition & 0 deletions examples/plugin/plugin/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "opentelemetry/common/attribute_value.h"
#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/context/context_value.h"
#include "opentelemetry/nostd/utility.h"
#include "opentelemetry/trace/span_context.h"
#include "opentelemetry/trace/span_metadata.h"
#include "tracer.h"
Expand Down
22 changes: 12 additions & 10 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
// clang-format on

#include "google/protobuf/message.h"
#include "google/protobuf/reflection.h"
#include "google/protobuf/stubs/common.h"
#include "nlohmann/json.hpp"

// clang-format off
Expand All @@ -28,15 +26,26 @@
#include "opentelemetry/sdk/common/global_log_handler.h"
#include "opentelemetry/version.h"

#ifdef _MSC_VER
# include <string.h>
# define strcasecmp _stricmp
#else
# include <strings.h>
#endif

#include <limits.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <fstream>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#if !defined(__CYGWIN__) && defined(_WIN32)
Expand Down Expand Up @@ -64,11 +73,8 @@

#else

# include <dirent.h>
# include <errno.h>
# include <fcntl.h>
# include <sys/stat.h>
# include <sys/types.h>
# include <unistd.h>

# define FS_ACCESS(x) access(x, F_OK)
Expand All @@ -89,10 +95,6 @@
# undef GetMessage
#endif

#ifdef _MSC_VER
# define strcasecmp _stricmp
#endif

#if (defined(_MSC_VER) && _MSC_VER >= 1600) || \
(defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L) || defined(__STDC_LIB_EXT1__)
# ifdef _MSC_VER
Expand Down
77 changes: 56 additions & 21 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <thread>
#include <utility>

#include "opentelemetry/common/macros.h"
#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/sdk/common/global_log_handler.h"
#include "opentelemetry/sdk/metrics/export/metric_producer.h"
Expand All @@ -29,6 +30,10 @@
# include <future>
#endif

#if OPENTELEMETRY_HAVE_EXCEPTIONS
# include <exception>
#endif

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
Expand Down Expand Up @@ -90,31 +95,61 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
bool PeriodicExportingMetricReader::CollectAndExportOnce()
{
std::atomic<bool> cancel_export_for_timeout{false};
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
});

std::future_status status;
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
do
std::unique_ptr<std::thread> task_thread;

#if OPENTELEMETRY_HAVE_EXCEPTIONS
try
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
#endif
std::promise<void> sender;
auto receiver = sender.get_future();

task_thread.reset(new std::thread([this, &cancel_export_for_timeout] {
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout.load(std::memory_order_acquire))
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< this->export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
}));

std::future_status status;
do
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout.store(true, std::memory_order_release);
break;
}
} while (status != std::future_status::ready);
#if OPENTELEMETRY_HAVE_EXCEPTIONS
}
catch (std::exception &e)
{
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect failed with exception "
<< e.what());
return false;
}
catch (...)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect failed with unknown exception");
return false;
}
#endif

if (task_thread && task_thread->joinable())
{
task_thread->join();
}

std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
while (notify_force_flush > notified_sequence)
Expand Down
38 changes: 33 additions & 5 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,25 @@

#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <thread>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;

class MockPushMetricExporter : public PushMetricExporter
{
public:
MockPushMetricExporter(std::chrono::milliseconds wait) : wait_(wait) {}

opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &record) noexcept override
{
if (wait_ > std::chrono::milliseconds::zero())
{
std::this_thread::sleep_for(wait_);
}
records_.push_back(record);
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
Expand All @@ -34,6 +44,7 @@ class MockPushMetricExporter : public PushMetricExporter

private:
std::vector<ResourceMetrics> records_;
std::chrono::milliseconds wait_;
};

class MockMetricProducer : public MetricProducer
Expand Down Expand Up @@ -61,17 +72,34 @@ class MockMetricProducer : public MetricProducer

TEST(PeriodicExporingMetricReader, BasicTests)
{
std::unique_ptr<PushMetricExporter> exporter(new MockPushMetricExporter());
std::unique_ptr<PushMetricExporter> exporter(
new MockPushMetricExporter(std::chrono::milliseconds{0}));
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
auto exporter_ptr = exporter.get();
PeriodicExportingMetricReader reader(std::move(exporter), options);
std::shared_ptr<PeriodicExportingMetricReader> reader =
std::make_shared<PeriodicExportingMetricReader>(std::move(exporter), options);
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
reader->SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
EXPECT_NO_THROW(reader.ForceFlush());
reader.Shutdown();
EXPECT_NO_THROW(reader->ForceFlush());
reader->Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
}

TEST(PeriodicExporingMetricReader, Timeout)
{
std::unique_ptr<PushMetricExporter> exporter(
new MockPushMetricExporter(std::chrono::milliseconds{2000}));
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
std::shared_ptr<PeriodicExportingMetricReader> reader =
std::make_shared<PeriodicExportingMetricReader>(std::move(exporter), options);
MockMetricProducer producer;
reader->SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
reader->Shutdown();
}