Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ticket spin locks will now devolve to using locks after a specific number of spins #193

Merged
merged 9 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ set(SOURCE_FILES
aws/utils/io_service_executor.h
aws/utils/logging.cc
aws/utils/logging.h
aws/utils/spin_lock.cc
aws/utils/spin_lock.h
aws/utils/time_sensitive.h
aws/utils/time_sensitive_queue.h
Expand Down Expand Up @@ -104,8 +105,8 @@ set(TESTS_SOURCE
aws/metrics/test/accumulator_test.cc
aws/metrics/test/metric_test.cc
aws/metrics/test/metrics_manager_test.cc
aws/auth/test/mutable_static_creds_provider_test.cc)

aws/auth/test/mutable_static_creds_provider_test.cc
)

set(THIRD_PARTY_LIBS third_party/lib)
set(THIRD_PARTY_INCLUDE third_party/include)
Expand Down
37 changes: 37 additions & 0 deletions aws/utils/spin_lock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Amazon Software License (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/asl
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

#include "spin_lock.h"

using namespace aws::utils;
#ifdef DEBUG
namespace {
thread_local TicketSpinLock::DebugStats debug_stats;
}

void TicketSpinLock::add_acquired() {
debug_stats.acquired_count++;
}

void TicketSpinLock::add_acquired_lock() {
debug_stats.acquired_with_lock++;
}

void TicketSpinLock::add_spin() {
debug_stats.total_spins++;
}

TicketSpinLock::DebugStats TicketSpinLock::get_debug_stats() {
return debug_stats;
}
#endif
79 changes: 76 additions & 3 deletions aws/utils/spin_lock.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Amazon Software License (the "License").
// You may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,10 @@
#define AWS_UTILS_SPIN_LOCK_H_

#include <atomic>

#include <cstdint>
#include <mutex>
#include <condition_variable>
#include <array>
#include <boost/noncopyable.hpp>

namespace aws {
Expand All @@ -27,22 +30,92 @@ class TicketSpinLock : boost::noncopyable {
: now_serving_(0),
next_ticket_(0) {}

#ifdef DEBUG
struct DebugStats {
std::uint64_t acquired_count;
std::uint64_t acquired_with_lock;
std::uint64_t total_spins;
};

void add_acquired();
void add_acquired_lock();
void add_spin();

DebugStats get_debug_stats();
#else
#define add_acquired();
#define add_acquired_lock();
#define add_spin();
#endif


inline void lock() noexcept {
size_t my_ticket = next_ticket_.fetch_add(1);
while (now_serving_ != my_ticket);
std::uint32_t spin_count = 0;
do {
add_spin();
spin_count++;
if (spin_count > kMaxSpinCount) {
//
// Condition variables in C++ are interesting. They require you take a lock before waiting on the variable.
// Once you start waiting the lock you originally took is released, which will allow the notifier to pick up
// the lock on the notification step.
//
std::unique_lock<std::mutex> lock(lock_for_ticket(my_ticket));
//
// We start waiting on the curent lock holder to notify us that they have completed their work.
// When the lock holder triggers the notification the provided lambda is used to determine whether this
// thread should wake up, or instead re-enter the blocked state. This prevents us from needing to
// re-enter this blocked state explicitly.
//
cv_for_ticket(my_ticket).wait(lock, [this, &my_ticket] { return now_serving_ == my_ticket; });
spin_count = 0;
add_acquired_lock();
}
} while (now_serving_ != my_ticket);
add_acquired();
}

inline void unlock() noexcept {
now_serving_++;
std::unique_lock<std::mutex> lock(lock_for_ticket(now_serving_));
cv_for_ticket(now_serving_).notify_all();
}

private:
inline std::size_t lock_shard(const std::size_t& ticket) noexcept {
return ticket % kLockShards;
}
inline std::mutex& lock_for_ticket(const std::size_t& ticket) noexcept {
return condition_locks_[lock_shard(ticket)];
}
inline std::condition_variable& cv_for_ticket(const std::size_t& ticket) noexcept {
return unlocked_cv_[lock_shard(ticket)];
}
static const std::uint32_t kMaxSpinCount = 100;

static const std::size_t kLockShards = 31;
std::array<std::mutex, kLockShards> condition_locks_;
std::array<std::condition_variable, kLockShards> unlocked_cv_;
std::atomic<size_t> now_serving_;
std::atomic<size_t> next_ticket_;


};

class SpinLock : boost::noncopyable {
public:

#ifdef DEBUG
struct DebugStats {
std::uint64_t acquired_count;
std::uint64_t acquired_with_lock;
std::uint64_t total_spins;
};
DebugStats get_debug_stats() {
return DebugStats();
}
#endif
inline void lock() noexcept {
while (!try_lock());
}
Expand Down
6 changes: 4 additions & 2 deletions aws/utils/test/concurrent_linked_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(Destructor) {
}

BOOST_AUTO_TEST_CASE(Concurrent) {
const size_t total_num_items = 1 << 22;
const size_t total_num_items = 1 << 20;

auto f = [&](const size_t num_threads) {
LOG(info) << "Starting concurrent queue test for " << num_threads
Expand Down Expand Up @@ -152,8 +152,10 @@ BOOST_AUTO_TEST_CASE(Concurrent) {
return end - start;
};

const std::size_t kThreadCount = 128;

for (size_t num_threads = 2;
num_threads < std::max(aws::thread::hardware_concurrency(), 8u);
num_threads < kThreadCount;
num_threads *= 2) {
auto duration = f(num_threads);
auto nanos =
Expand Down
29 changes: 26 additions & 3 deletions aws/utils/test/spin_lock_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Amazon Software License (the "License").
// You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,16 +33,25 @@ void test(const std::string test_name,
Mutex mutex;
volatile size_t counter = 0;
std::vector<aws::thread> threads;
#ifdef DEBUG
std::vector<typename Mutex::DebugStats> debug_stats;
debug_stats.resize(num_threads);
#endif

std::size_t thread_count = 0;
while (threads.size() < num_threads) {
threads.push_back(aws::thread([&] {
std::size_t thread_index = thread_count++;
threads.push_back(aws::thread([&, thread_index] {
aws::utils::sleep_until(start);
for (size_t i = 0; i < cycles_per_thread; i++) {
aws::lock_guard<Mutex> lk(mutex);
for (size_t j = 0; j < loop_per_cycle; j++) {
counter++;
}
}
#ifdef DEBUG
debug_stats[thread_index] = mutex.get_debug_stats();
#endif
}));
}

Expand All @@ -58,6 +67,20 @@ void test(const std::string test_name,
LOG(info) << test_name << ": " << num_threads << " threads: "
<< cycles_per_thread * num_threads / seconds
<< " ops per sec";

#ifdef DEBUG
LOG(info) << test_name << ": DebugStats";
LOG(info) << "\t"
<< std::setw(20) << "Acquired Count"
<< std::setw(20) << "Acquired with Lock"
<< std::setw(20) << "Total Spins";
std::for_each(debug_stats.begin(), debug_stats.end(), [](typename Mutex::DebugStats& d) {
LOG(info) << "\t"
<< std::setw(20) << d.acquired_count
<< std::setw(20) << d.acquired_with_lock
<< std::setw(20) << d.total_spins;
});
#endif
}

} //namespace
Expand All @@ -71,7 +94,7 @@ BOOST_AUTO_TEST_CASE(SpinLock) {
}

BOOST_AUTO_TEST_CASE(TicketSpinLock) {
for (size_t i : {1, 4, 8}) {
for (size_t i : {1, 4, 8, 16, 32}) {
test<aws::utils::TicketSpinLock, 100000>("TicketSpinLock", i);
}
}
Expand Down
2 changes: 1 addition & 1 deletion java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.12.8</version>
<version>0.12.9-SNAPSHOT</version>
<name>Amazon Kinesis Producer Library</name>

<scm>
Expand Down