Skip to content

Commit

Permalink
Merge pull request #1028 from TheMarex/tbb-port
Browse files Browse the repository at this point in the history
Port from OpenMP to TBB
  • Loading branch information
DennisOSRM committed May 22, 2014
2 parents 7d7cce5 + e2daf5c commit 885dbe1
Show file tree
Hide file tree
Showing 14 changed files with 581 additions and 217 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ install:
- sudo apt-add-repository -y ppa:ubuntu-toolchain-r/test
- sudo add-apt-repository -y ppa:boost-latest/ppa
- sudo apt-get update >/dev/null
- sudo apt-get -q install libprotoc-dev libprotobuf7 libprotobuf-dev libosmpbf-dev libbz2-dev libstxxl-dev libstxxl1 libxml2-dev libzip-dev lua5.1 liblua5.1-0-dev rubygems
- sudo apt-get -q install libprotoc-dev libprotobuf7 libprotobuf-dev libosmpbf-dev libbz2-dev libstxxl-dev libstxxl1 libxml2-dev libzip-dev lua5.1 liblua5.1-0-dev rubygems libtbb-dev
- sudo apt-get -q install g++-4.7
- sudo apt-get install libboost1.54-all-dev
#luabind
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ target_link_libraries(osrm-datastore ${Boost_LIBRARIES} UUID GITDESCRIPTION COOR
find_package(Threads REQUIRED)
target_link_libraries(osrm-extract ${CMAKE_THREAD_LIBS_INIT})

find_package(TBB REQUIRED)
target_link_libraries(osrm-extract ${TBB_LIBRARIES})
target_link_libraries(osrm-prepare ${TBB_LIBRARIES})
include_directories(${TBB_INCLUDE_DIR})

find_package(Lua52)
if(NOT LUA52_FOUND)
find_package(Lua51 REQUIRED)
Expand Down
198 changes: 119 additions & 79 deletions Contractor/Contractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "../Util/OpenMPWrapper.h"
#include "../Util/SimpleLogger.h"
#include "../Util/StringUtil.h"
#include "../Util/TimingUtil.h"

#include <boost/assert.hpp>

#include <tbb/enumerable_thread_specific.h>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <limits>
#include <vector>
Expand Down Expand Up @@ -125,6 +129,28 @@ class Contractor
bool is_independent : 1;
};


struct ThreadDataContainer
{
ThreadDataContainer(int number_of_nodes) : number_of_nodes(number_of_nodes) {}

inline ContractorThreadData* getThreadData()
{
bool exists = false;
auto& ref = data.local(exists);
if (!exists)
{
ref = std::make_shared<ContractorThreadData>(number_of_nodes);
}

return ref.get();
}

int number_of_nodes;
typedef tbb::enumerable_thread_specific<std::shared_ptr<ContractorThreadData>> EnumerableThreadData;
EnumerableThreadData data;
};

public:
template <class ContainerT> Contractor(int nodes, ContainerT &input_edge_list)
{
Expand Down Expand Up @@ -262,39 +288,51 @@ class Contractor

void Run()
{
// for the preperation we can use a big grain size, which is much faster (probably cache)
constexpr size_t InitGrainSize = 100000;
constexpr size_t PQGrainSize = 100000;
// auto_partitioner will automatically increase the blocksize if we have
// a lot of data. It is *important* for the last loop iterations
// (which have a very small dataset) that it is devisible.
constexpr size_t IndependentGrainSize = 1;
constexpr size_t ContractGrainSize = 1;
constexpr size_t NeighboursGrainSize = 1;
constexpr size_t DeleteGrainSize = 1;

const NodeID number_of_nodes = contractor_graph->GetNumberOfNodes();
Percent p(number_of_nodes);

const unsigned thread_count = omp_get_max_threads();
std::vector<ContractorThreadData *> thread_data_list;
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
{
thread_data_list.push_back(new ContractorThreadData(number_of_nodes));
}
std::cout << "Contractor is using " << thread_count << " threads" << std::endl;
ThreadDataContainer thread_data_list(number_of_nodes);

NodeID number_of_contracted_nodes = 0;
std::vector<RemainingNodeData> remaining_nodes(number_of_nodes);
std::vector<float> node_priorities(number_of_nodes);
std::vector<NodePriorityData> node_data(number_of_nodes);

// initialize priorities in parallel
#pragma omp parallel for schedule(guided)
for (int x = 0; x < (int)number_of_nodes; ++x)
{
remaining_nodes[x].id = x;
}

// initialize priorities in parallel
tbb::parallel_for(tbb::blocked_range<int>(0, number_of_nodes, InitGrainSize),
[&remaining_nodes](const tbb::blocked_range<int>& range)
{
for (int x = range.begin(); x != range.end(); ++x)
{
remaining_nodes[x].id = x;
}
}
);


std::cout << "initializing elimination PQ ..." << std::flush;
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp parallel for schedule(guided)
for (int x = 0; x < (int)number_of_nodes; ++x)
tbb::parallel_for(tbb::blocked_range<int>(0, number_of_nodes, PQGrainSize),
[this, &node_priorities, &node_data, &thread_data_list](const tbb::blocked_range<int>& range)
{
node_priorities[x] = EvaluateNodePriority(data, &node_data[x], x);
ContractorThreadData *data = thread_data_list.getThreadData();
for (int x = range.begin(); x != range.end(); ++x)
{
node_priorities[x] = this->EvaluateNodePriority(data, &node_data[x], x);
}
}
}
);
std::cout << "ok" << std::endl << "preprocessing " << number_of_nodes << " nodes ..."
<< std::flush;

Expand All @@ -309,11 +347,7 @@ class Contractor
std::cout << " [flush " << number_of_contracted_nodes << " nodes] " << std::flush;

// Delete old heap data to free memory that we need for the coming operations
for (ContractorThreadData *data : thread_data_list)
{
delete data;
}
thread_data_list.clear();
thread_data_list.data.clear();

// Create new priority array
std::vector<float> new_node_priority(remaining_nodes.size());
Expand Down Expand Up @@ -396,61 +430,69 @@ class Contractor

// INFO: MAKE SURE THIS IS THE LAST OPERATION OF THE FLUSH!
// reinitialize heaps and ThreadData objects with appropriate size
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
{
thread_data_list.push_back(
new ContractorThreadData(contractor_graph->GetNumberOfNodes()));
}
thread_data_list.number_of_nodes = contractor_graph->GetNumberOfNodes();
}

const int last = (int)remaining_nodes.size();
#pragma omp parallel
{
// determine independent node set
ContractorThreadData *const data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided)
for (int i = 0; i < last; ++i)
tbb::parallel_for(tbb::blocked_range<int>(0, last, IndependentGrainSize),
[this, &node_priorities, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
const NodeID node = remaining_nodes[i].id;
remaining_nodes[i].is_independent =
IsNodeIndependent(node_priorities, data, node);
ContractorThreadData *data = thread_data_list.getThreadData();
// determine independent node set
for (int i = range.begin(); i != range.end(); ++i)
{
const NodeID node = remaining_nodes[i].id;
remaining_nodes[i].is_independent =
this->IsNodeIndependent(node_priorities, data, node);
}
}
}
);

const auto first = stable_partition(remaining_nodes.begin(),
remaining_nodes.end(),
[](RemainingNodeData node_data)
{ return !node_data.is_independent; });
const int first_independent_node = first - remaining_nodes.begin();
// contract independent nodes
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)

// contract independent nodes
tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, ContractGrainSize),
[this, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
NodeID x = remaining_nodes[position].id;
ContractNode<false>(data, x);
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
const NodeID x = remaining_nodes[position].id;
this->ContractNode<false>(data, x);
}
}

std::sort(data->inserted_edges.begin(), data->inserted_edges.end());
}
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)
);
// make sure we really sort each block
tbb::parallel_for(thread_data_list.data.range(),
[&](const ThreadDataContainer::EnumerableThreadData::range_type& range)
{
NodeID x = remaining_nodes[position].id;
DeleteIncomingEdges(data, x);
for (auto& data : range)
std::sort(data->inserted_edges.begin(),
data->inserted_edges.end());
}
}
);
tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, DeleteGrainSize),
[this, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
const NodeID x = remaining_nodes[position].id;
this->DeleteIncomingEdges(data, x);
}
}
);

// insert new edges
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
for (auto& data : thread_data_list.data)
{
ContractorThreadData &data = *thread_data_list[thread_id];
for (const ContractorEdge &edge : data.inserted_edges)
for (const ContractorEdge &edge : data->inserted_edges)
{
auto current_edge_ID = contractor_graph->FindEdge(edge.source, edge.target);
const EdgeID current_edge_ID = contractor_graph->FindEdge(edge.source, edge.target);
if (current_edge_ID < contractor_graph->EndEdges(edge.source))
{
ContractorGraph::EdgeData &current_data =
Expand All @@ -466,19 +508,21 @@ class Contractor
}
contractor_graph->InsertEdge(edge.source, edge.target, edge.data);
}
data.inserted_edges.clear();
data->inserted_edges.clear();
}
// update priorities
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)

tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, NeighboursGrainSize),
[this, &remaining_nodes, &node_priorities, &node_data, &thread_data_list](const tbb::blocked_range<int>& range)
{
NodeID x = remaining_nodes[position].id;
UpdateNodeNeighbours(node_priorities, node_data, data, x);
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
NodeID x = remaining_nodes[position].id;
this->UpdateNodeNeighbours(node_priorities, node_data, data, x);
}
}
}
);

// remove contracted nodes from the pool
number_of_contracted_nodes += last - first_independent_node;
remaining_nodes.resize(first_independent_node);
Expand Down Expand Up @@ -510,11 +554,8 @@ class Contractor

p.printStatus(number_of_contracted_nodes);
}
for (ContractorThreadData *data : thread_data_list)
{
delete data;
}
thread_data_list.clear();

thread_data_list.data.clear();
}

template <class Edge> inline void GetEdges(DeallocatingVector<Edge> &edges)
Expand Down Expand Up @@ -769,7 +810,6 @@ class Contractor
true,
true,
false);
;
inserted_edges.push_back(new_edge);
std::swap(new_edge.source, new_edge.target);
new_edge.data.forward = false;
Expand Down
3 changes: 1 addition & 2 deletions Contractor/TemporaryStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "TemporaryStorage.h"

StreamData::StreamData()
: write_mode(true), temp_path(boost::filesystem::unique_path(temp_directory.append(
TemporaryFilePattern.begin(), TemporaryFilePattern.end()))),
: write_mode(true), temp_path(boost::filesystem::unique_path(temp_directory / TemporaryFilePattern)),
temp_file(new boost::filesystem::fstream(
temp_path, std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary)),
readWriteMutex(std::make_shared<boost::mutex>())
Expand Down
5 changes: 2 additions & 3 deletions DataStructures/DynamicGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <algorithm>
#include <limits>
#include <vector>
#include <atomic>

template <typename EdgeDataT> class DynamicGraph
{
Expand Down Expand Up @@ -198,7 +199,6 @@ template <typename EdgeDataT> class DynamicGraph
void DeleteEdge(const NodeIterator source, const EdgeIterator e)
{
Node &node = m_nodes[source];
#pragma omp atomic
--m_numEdges;
--node.edges;
BOOST_ASSERT(std::numeric_limits<unsigned>::max() != node.edges);
Expand Down Expand Up @@ -226,7 +226,6 @@ template <typename EdgeDataT> class DynamicGraph
}
}

#pragma omp atomic
m_numEdges -= deleted;
m_nodes[source].edges -= deleted;

Expand Down Expand Up @@ -272,7 +271,7 @@ template <typename EdgeDataT> class DynamicGraph
};

NodeIterator m_numNodes;
EdgeIterator m_numEdges;
std::atomic_uint m_numEdges;

std::vector<Node> m_nodes;
DeallocatingVector<Edge> m_edges;
Expand Down
5 changes: 2 additions & 3 deletions DataStructures/Percent.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "../Util/OpenMPWrapper.h"
#include <iostream>
#include <atomic>

class Percent
{
Expand Down Expand Up @@ -61,20 +62,18 @@ class Percent

void printIncrement()
{
#pragma omp atomic
++m_current_value;
printStatus(m_current_value);
}

void printAddition(const unsigned addition)
{
#pragma omp atomic
m_current_value += addition;
printStatus(m_current_value);
}

private:
unsigned m_current_value;
std::atomic_uint m_current_value;
unsigned m_max_value;
unsigned m_percent_interval;
unsigned m_next_threshold;
Expand Down
Loading

0 comments on commit 885dbe1

Please sign in to comment.