Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Valgrind errors on std::atomic variables #2244

Merged
merged 1 commit into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
# ifdef ENABLE_ASYNC_EXPORT
struct SynchronizationData
{
std::atomic<std::size_t> session_counter_;
std::atomic<std::size_t> finished_session_counter_;
std::atomic<std::size_t> session_counter_{0};
std::atomic<std::size_t> finished_session_counter_{0};
std::condition_variable force_flush_cv;
std::mutex force_flush_cv_m;
std::recursive_mutex force_flush_m;
Expand Down
8 changes: 3 additions & 5 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace
{

/**
* This class handles the response message from the Elasticsearch request
* This class handles the response message from the HTTP request
*/
class ResponseHandler : public http_client::EventHandler
{
Expand All @@ -75,9 +75,7 @@ class ResponseHandler : public http_client::EventHandler
ResponseHandler(std::function<bool(opentelemetry::sdk::common::ExportResult)> &&callback,
bool console_debug = false)
: result_callback_{std::move(callback)}, console_debug_{console_debug}
{
stopping_.store(false);
}
{}

std::string BuildResponseLogMessage(http_client::Response &response,
const std::string &body) noexcept
Expand Down Expand Up @@ -356,7 +354,7 @@ class ResponseHandler : public http_client::EventHandler
const opentelemetry::ext::http::client::Session *session_ = nullptr;

// Whether notify has been called
std::atomic<bool> stopping_;
std::atomic<bool> stopping_{false};

// A string to store the response body
std::string body_ = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class Session : public opentelemetry::ext::http::client::Session,
std::string scheme = "http",
const std::string &host = "",
uint16_t port = 80)
: http_client_(http_client), is_session_active_(false)
: http_client_(http_client)
{
host_ = scheme + "://" + host + ":" + std::to_string(port) + "/";
}
Expand Down Expand Up @@ -216,7 +216,7 @@ class Session : public opentelemetry::ext::http::client::Session,
std::unique_ptr<HttpOperation> curl_operation_;
uint64_t session_id_;
HttpClient &http_client_;
std::atomic<bool> is_session_active_;
std::atomic<bool> is_session_active_{false};
};

class HttpClientSync : public opentelemetry::ext::http::client::HttpClientSync
Expand Down Expand Up @@ -352,7 +352,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

std::mutex multi_handle_m_;
CURLM *multi_handle_;
std::atomic<uint64_t> next_session_id_;
std::atomic<uint64_t> next_session_id_{0};
uint64_t max_sessions_per_connection_;

std::mutex sessions_m_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ class HttpOperation

const char *GetCurlErrorMessage(CURLcode code);

std::atomic<bool> is_aborted_; // Set to 'true' when async callback is aborted
std::atomic<bool> is_finished_; // Set to 'true' when async callback is finished.
std::atomic<bool> is_cleaned_; // Set to 'true' when async callback is cleaned.
const bool is_raw_response_; // Do not split response headers from response body
const bool reuse_connection_; // Reuse connection
std::atomic<bool> is_aborted_{false}; // Set to 'true' when async callback is aborted
std::atomic<bool> is_finished_{false}; // Set to 'true' when async callback is finished.
std::atomic<bool> is_cleaned_{false}; // Set to 'true' when async callback is cleaned.
const bool is_raw_response_; // Do not split response headers from response body
const bool reuse_connection_; // Reuse connection
const std::chrono::milliseconds http_conn_timeout_; // Timeout for connect. Default: 5000ms

char curl_error_message_[CURL_ERROR_SIZE];
Expand Down Expand Up @@ -311,7 +311,7 @@ class HttpOperation

std::thread::id callback_thread;
std::function<void(HttpOperation &)> callback;
std::atomic<bool> is_promise_running;
std::atomic<bool> is_promise_running{false};
std::promise<CURLcode> result_promise;
std::future<CURLcode> result_future;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class TracezDataAggregator

/** A boolean that is set to true in the constructor and false in the
* destructor to start and end execution of aggregate spans **/
std::atomic<bool> execute_;
std::atomic<bool> execute_{false};

/** Thread that executes aggregate spans at regurlar intervals during this
object's lifetime**/
Expand Down
4 changes: 2 additions & 2 deletions ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRe
protected:
HTTP_SERVER_NS::HttpServer server_;
std::string server_address_;
std::atomic<bool> is_setup_;
std::atomic<bool> is_running_;
std::atomic<bool> is_setup_{false};
std::atomic<bool> is_running_{false};
std::vector<HTTP_SERVER_NS::HttpRequest> received_requests_;
std::mutex mtx_requests;
std::condition_variable cv_got_events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ class BatchLogRecordProcessor : public LogRecordProcessor
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SimpleLogRecordProcessor : public LogRecordProcessor
// The lock used to ensure the exporter is not called concurrently
opentelemetry::common::SpinLockMutex lock_;
// The atomic boolean to ensure the ShutDown() function is only called once
std::atomic<bool> is_shutdown_;
std::atomic<bool> is_shutdown_{false};
};
} // namespace logs
} // namespace sdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;

/* Synchronization primitives */
std::atomic<bool> is_force_flush_pending_;
std::atomic<bool> is_force_wakeup_background_worker_;
std::atomic<bool> is_force_flush_notified_;
std::atomic<bool> is_force_flush_pending_{false};
std::atomic<bool> is_force_wakeup_background_worker_{false};
std::atomic<bool> is_force_flush_notified_{false};
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_m_;
};
Expand Down
10 changes: 5 additions & 5 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ class BatchSpanProcessor : public SpanProcessor
std::mutex cv_m, force_flush_cv_m, shutdown_m;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_force_wakeup_background_worker;
std::atomic<bool> is_force_flush_pending;
std::atomic<bool> is_force_flush_notified;
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us;
std::atomic<bool> is_shutdown;
std::atomic<bool> is_force_wakeup_background_worker{false};
std::atomic<bool> is_force_flush_pending{false};
std::atomic<bool> is_force_flush_notified{false};
std::atomic<std::chrono::microseconds::rep> force_flush_timeout_us{0};
std::atomic<bool> is_shutdown{false};
};

/**
Expand Down
15 changes: 2 additions & 13 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(
buffer_(max_queue_size_),
synchronization_data_(std::make_shared<SynchronizationData>()),
worker_thread_(&BatchLogRecordProcessor::DoBackgroundWork, this)
{
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->is_shutdown.store(false);
}
{}

BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptr<LogRecordExporter> &&exporter,
const BatchLogRecordProcessorOptions &options)
Expand All @@ -46,13 +41,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptr<LogRecordExport
buffer_(options.max_queue_size),
synchronization_data_(std::make_shared<SynchronizationData>()),
worker_thread_(&BatchLogRecordProcessor::DoBackgroundWork, this)
{
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->force_flush_timeout_us.store(0);
synchronization_data_->is_shutdown.store(false);
}
{}

std::unique_ptr<Recordable> BatchLogRecordProcessor::MakeRecordable() noexcept
{
Expand Down
8 changes: 1 addition & 7 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr<SpanExporter> &&exporter,
buffer_(max_queue_size_),
synchronization_data_(std::make_shared<SynchronizationData>()),
worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this)
{
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
synchronization_data_->force_flush_timeout_us.store(0);
synchronization_data_->is_shutdown.store(false);
}
{}

std::unique_ptr<Recordable> BatchSpanProcessor::MakeRecordable() noexcept
{
Expand Down