Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event handling: fix a bug and clean house #2421

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions arbor/backends/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <arbor/fvm_types.hpp>
#include <arbor/serdes.hpp>
#include <arbor/mechanism_abi.h>
#include <arbor/generic_event.hpp>

// Structures for the representation of event delivery targets and
// staged events.
Expand Down Expand Up @@ -46,9 +45,6 @@ struct deliverable_event {
ARB_SERDES_ENABLE(deliverable_event, time, weight, handle);
};

template<>
struct has_event_index<deliverable_event> : public std::true_type {};

// Subset of event information required for mechanism delivery.
struct deliverable_event_data {
cell_local_size_type mech_index; // same as target_handle::mech_index
Expand All @@ -61,11 +57,6 @@ struct deliverable_event_data {
weight);
};

// Stream index accessor function for multi_event_stream:
inline cell_local_size_type event_index(const arb_deliverable_event_data& ed) {
return ed.mech_index;
}

// Delivery data accessor function for multi_event_stream:
inline arb_deliverable_event_data event_data(const deliverable_event& ev) {
return {ev.handle.mech_index, ev.weight};
Expand Down
99 changes: 79 additions & 20 deletions arbor/backends/event_stream_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

#include <vector>

#include <arbor/generic_event.hpp>
#include <arbor/mechanism_abi.h>


#include "backends/event.hpp"
#include "backends/event_stream_state.hpp"
#include "event_lane.hpp"
Expand All @@ -18,15 +16,13 @@ namespace arb {

template <typename Event>
struct event_stream_base {
using size_type = std::size_t;
using event_type = Event;
using event_time_type = ::arb::event_time_type<Event>;
using event_data_type = ::arb::event_data_type<Event>;
using event_data_type = decltype(event_data(std::declval<Event>()));

protected: // members
std::vector<event_data_type> ev_data_;
std::vector<std::size_t> ev_spans_ = {0};
size_type index_ = 0;
std::size_t index_ = 0;
event_data_type* base_ptr_ = nullptr;

public:
Expand Down Expand Up @@ -62,24 +58,32 @@ struct event_stream_base {
index_ = 0;
}

// Construct a mapping of mech_id to a stream s.t. streams are partitioned into
// time step buckets by `ev_span`
protected:
// backend specific initializations
virtual void init() = 0;
};

struct spike_event_stream_base : event_stream_base<deliverable_event> {
template<typename EventStream>
static std::enable_if_t<std::is_base_of_v<event_stream_base, EventStream>>
multi_event_stream(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
friend void initialize(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
arb_assert(lanes.size() < divs.size());

// reset streams and allocate sufficient space for temporaries
auto n_steps = steps.size();
std::unordered_map<unsigned, std::vector<std::size_t>> dt_sizes;
for (auto& [k, v]: streams) {
v.clear();
dt_sizes[k].resize(n_steps, 0);
v.spike_counter_.clear();
v.spike_counter_.resize(steps.size(), 0);
v.spikes_.clear();
// ev_data_ has been cleared during v.clear(), so we use its capacity
v.spikes_.reserve(v.ev_data_.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using capacity here, that seems unusual.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is intentional, as ev_data_ has been cleared before. I should probably make a comment. Otherwise, I can reorder the operations, so that v.clear() is called afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be confusing, just make a note.

}

// loop over lanes: group events by mechanism and sort them by time
auto cell = 0;
for (const auto& lane: lanes) {
auto div = divs[cell];
Expand All @@ -94,16 +98,71 @@ struct event_stream_base {
if (step >= n_steps) break;
arb_assert(div + target < handles.size());
const auto& handle = handles[div + target];
streams[handle.mech_id].ev_data_.push_back({handle.mech_index, weight});
dt_sizes[handle.mech_id][step]++;
auto& stream = streams[handle.mech_id];
stream.spikes_.push_back(spike_data{step, handle.mech_index, time, weight});
// insertion sort with last element as pivot
// ordering: first w.r.t. step, within a step: mech_index, within a mech_index: time
auto first = stream.spikes_.begin();
auto last = stream.spikes_.end();
auto pivot = std::prev(last, 1);
std::rotate(std::upper_bound(first, pivot, *pivot), pivot, last);
// increment count in current time interval
stream.spike_counter_[step]++;
}
}

for (auto& [id, stream]: streams) {
util::make_partition(stream.ev_spans_, dt_sizes[id]);
stream.init();
// copy temporary deliverable_events into stream's ev_data_
stream.ev_data_.reserve(stream.spikes_.size());
std::transform(stream.spikes_.begin(), stream.spikes_.end(), std::back_inserter(stream.ev_data_),
[](auto const& e) noexcept -> arb_deliverable_event_data {
return {e.mech_index, e.weight}; });
// scan over spike_counter_ and written to ev_spans_
util::make_partition(stream.ev_spans_, stream.spike_counter_);
// delegate to derived class init: static cast necessary to access protected init()
static_cast<spike_event_stream_base&>(stream).init();
}
}

protected: // members
struct spike_data {
arb_size_type step = 0;
cell_local_size_type mech_index = 0;
time_type time = 0;
float weight = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
float weight = 0;
float weight = 0;
auto operator<=>(const spike_data&) const = default;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely, thanks for pointing this out

auto operator<=>(spike_data const&) const noexcept = default;
};
std::vector<spike_data> spikes_;
std::vector<std::size_t> spike_counter_;
};

struct sample_event_stream_base : event_stream_base<sample_event> {
friend void initialize(const std::vector<std::vector<sample_event>>& staged,
sample_event_stream_base& stream) {
// clear previous data
stream.clear();

// return if there are no timestep bins
if (!staged.size()) return;

// return if there are no events
auto num_events = util::sum_by(staged, [] (const auto& v) {return v.size();});
if (!num_events) return;

// allocate space for spans and data
stream.ev_spans_.reserve(staged.size() + 1);
stream.ev_data_.reserve(num_events);

// add event data and spans
for (const auto& v : staged) {
for (const auto& ev: v) stream.ev_data_.push_back(ev.raw);
stream.ev_spans_.push_back(stream.ev_data_.size());
}

arb_assert(num_events == stream.ev_data_.size());
arb_assert(staged.size() + 1 == stream.ev_spans_.size());
stream.init();
}
};

} // namespace arb
126 changes: 20 additions & 106 deletions arbor/backends/gpu/event_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,113 +2,33 @@

// Indexed collection of pop-only event queues --- CUDA back-end implementation.

#include <arbor/mechanism_abi.h>

#include "backends/event_stream_base.hpp"
#include "util/transform.hpp"
#include "threading/threading.hpp"
#include "timestep_range.hpp"
#include "memory/memory.hpp"

namespace arb {
namespace gpu {

template <typename Event>
struct event_stream: public event_stream_base<Event> {
public:
using base = event_stream_base<Event>;
using size_type = typename base::size_type;
using event_data_type = typename base::event_data_type;
using device_array = memory::device_vector<event_data_type>;

using base::clear;
using base::ev_data_;
using base::ev_spans_;
using base::base_ptr_;

event_stream() = default;
event_stream(task_system_handle t): base(), thread_pool_{t} {}

// Initialize event streams from a vector of vector of events
// Outer vector represents time step bins
void init(const std::vector<std::vector<Event>>& staged) {
// clear previous data
clear();

// return if there are no timestep bins
if (!staged.size()) return;

// return if there are no events
const size_type num_events = util::sum_by(staged, [] (const auto& v) {return v.size();});
if (!num_events) return;

// allocate space for spans and data
ev_spans_.resize(staged.size() + 1);
ev_data_.resize(num_events);
resize(device_ev_data_, num_events);

// compute offsets by exclusive scan over staged events
util::make_partition(ev_spans_,
util::transform_view(staged, [](const auto& v) { return v.size(); }),
0ull);

// assign, copy to device (and potentially sort) the event data in parallel
arb_assert(thread_pool_);
arb_assert(ev_spans_.size() == staged.size() + 1);
threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this, &staged](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);
const auto len = end - beg;

auto host_span = memory::make_view(ev_data_)(beg, end);

// make event data and copy
std::copy_n(util::transform_view(staged[i],
[](const auto& x) { return event_data(x); }).begin(),
len,
host_span.begin());
// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
auto device_span = memory::make_view(device_ev_data_)(beg, end);
memory::copy_async(host_span, device_span);
});

base_ptr_ = device_ev_data_.data();
template<typename BaseEventStream>
struct event_stream : BaseEventStream {
public:
ARB_SERDES_ENABLE(event_stream<BaseEventStream>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

arb_assert(num_events == device_ev_data_.size());
arb_assert(num_events == ev_data_.size());
protected:
void init() override final {
resize(this->device_ev_data_, this->device_ev_data_.size());
memory::copy_async(this->ev_data_, this->device_ev_data_);
this->base_ptr_ = this->device_ev_data_.data();
}

// Initialize event stream assuming ev_data_ and ev_span_ has
// been set previously (e.g. by `base::multi_event_stream`)
void init() {
resize(device_ev_data_, ev_data_.size());
base_ptr_ = device_ev_data_.data();

threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);

auto host_span = memory::make_view(ev_data_)(beg, end);
auto device_span = memory::make_view(device_ev_data_)(beg, end);
private: // device memory
using event_data_type = typename BaseEventStream::event_data_type;
using device_array = memory::device_vector<event_data_type>;

// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, device_span);
});
}
device_array device_ev_data_;

template<typename D>
static void resize(D& d, std::size_t size) {
Expand All @@ -117,16 +37,10 @@ struct event_stream: public event_stream_base<Event> {
d = D(size);
}
}

ARB_SERDES_ENABLE(event_stream<Event>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

task_system_handle thread_pool_;
device_array device_ev_data_;
};

using spike_event_stream = event_stream<spike_event_stream_base>;
using sample_event_stream = event_stream<sample_event_stream_base>;

} // namespace gpu
} // namespace arb
2 changes: 0 additions & 2 deletions arbor/backends/gpu/fvm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ struct backend {
using threshold_watcher = arb::gpu::threshold_watcher;
using cable_solver = arb::gpu::matrix_state_fine<arb_value_type, arb_index_type>;
using diffusion_solver = arb::gpu::diffusion_state<arb_value_type, arb_index_type>;
using deliverable_event_stream = arb::gpu::deliverable_event_stream;
using sample_event_stream = arb::gpu::sample_event_stream;

using shared_state = arb::gpu::shared_state;
using ion_state = arb::gpu::ion_state;
Expand Down
3 changes: 0 additions & 3 deletions arbor/backends/gpu/gpu_store_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ using array = memory::device_vector<arb_value_type>;
using iarray = memory::device_vector<arb_index_type>;
using sarray = memory::device_vector<arb_size_type>;

using deliverable_event_stream = arb::gpu::event_stream<deliverable_event>;
using sample_event_stream = arb::gpu::event_stream<sample_event>;

} // namespace gpu
} // namespace arb

12 changes: 2 additions & 10 deletions arbor/backends/gpu/shared_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ shared_state::shared_state(task_system_handle tp,
time_since_spike(n_cell*n_detector),
src_to_spike(make_const_view(src_to_spike_)),
cbprng_seed(cbprng_seed_),
sample_events(thread_pool),
sample_events(),
watcher{n_cv_, src_to_spike.data(), detector_info}
{
memory::fill(time_since_spike, -1.0);
Expand Down Expand Up @@ -240,7 +240,7 @@ void shared_state::instantiate(mechanism& m,

if (storage.count(id)) throw arb::arbor_internal_error("Duplicate mech id in shared state");
auto& store = storage.emplace(id, mech_storage{}).first->second;
streams[id] = deliverable_event_stream{thread_pool};
streams[id] = spike_event_stream{};

// Allocate view pointers
store.state_vars_ = std::vector<arb_value_type*>(m.mech_.n_state_vars);
Expand Down Expand Up @@ -388,14 +388,6 @@ void shared_state::take_samples() {
}
}

void shared_state::init_events(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<size_t>& divs,
const timestep_range& dts) {
arb::gpu::event_stream<deliverable_event>::multi_event_stream(lanes, handles, divs, dts, streams);
}


// Debug interface
ARB_ARBOR_API std::ostream& operator<<(std::ostream& o, shared_state& s) {
using io::csv;
Expand Down
Loading