Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RPC] Asynchronous signals. #177

Merged
merged 4 commits into from
Mar 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/cocaine/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <vector>

Expand Down
38 changes: 12 additions & 26 deletions include/cocaine/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,19 @@
#include "cocaine/context/mapper.hpp"
#include "cocaine/context/signal.hpp"

#include "cocaine/idl/context.hpp"

#include "cocaine/locked_ptr.hpp"
#include "cocaine/repository.hpp"

#include <blackhole/blackhole.hpp>

#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/optional.hpp>
#include <boost/signals2/signal.hpp>

namespace cocaine {

// Context

namespace signals = boost::signals2;

class actor_t;
class execution_unit_t;

Expand All @@ -65,6 +63,9 @@ class context_t {
// because services are allowed to start and stop other services during their lifetime.
synchronized<service_list_t> m_services;

// Context signalling hub.
retroactive_signal<io::context_tag> m_signals;

#ifdef COCAINE_ALLOW_RAFT
std::unique_ptr<raft::repository_t> m_raft;
#endif
Expand All @@ -75,28 +76,6 @@ class context_t {
// Service port mapping and pinning.
port_mapping_t mapper;

struct signals_t {
typedef signals::signal<void()> context_signals_t;
typedef cocaine::retroactive_signal<void(const actor_t& service)> service_signals_t;

struct {
// Fired on service creation, after service's thread is launched and is ready to accept
// and process new incoming connections.
service_signals_t exposed;

// Fired on service destruction, after the service was removed from its endpoints, but
// before the service object is actually destroyed.
service_signals_t removed;
} service;

// Fired first thing on context shutdown. This is a very good time to cleanup persistent
// connections, synchronize disk state and so on.
context_signals_t shutdown;
};

// Lifecycle management signals.
signals_t signals;

public:
context_t(config_t config, std::unique_ptr<logging::logger_t> logger);
~context_t();
Expand All @@ -119,6 +98,13 @@ class context_t {
auto
locate(const std::string& name) const -> boost::optional<const actor_t&>;

// Signals API

void
listen(const std::shared_ptr<dispatch<io::context_tag>>& slot, asio::io_service& asio) {
m_signals.listen(slot, asio);
}

// Network I/O

auto
Expand Down
140 changes: 97 additions & 43 deletions include/cocaine/context/signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,72 +21,126 @@
#ifndef COCAINE_CONTEXT_SIGNAL_HPP
#define COCAINE_CONTEXT_SIGNAL_HPP

#include "cocaine/tuple.hpp"
#include "cocaine/locked_ptr.hpp"

#include "cocaine/rpc/dispatch.hpp"
#include "cocaine/rpc/frozen.hpp"

#include <algorithm>
#include <list>
#include <mutex>

#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/signals2/signal.hpp>

namespace cocaine {

namespace signals = boost::signals2;
template<class Tag> class retroactive_signal;

template<
class Signature,
class IndexSequence = typename make_index_sequence<signals::signal<Signature>::arity>::type
>
class retroactive_signal;
namespace aux {

template<typename... Args, size_t... Indices>
class retroactive_signal<void(Args...), index_sequence<Indices...>> {
typedef signals::signal<void(Args...)> signal_type;
template<class Event>
struct async_visitor:
public boost::static_visitor<void>
{
typedef typename io::basic_slot<Event>::tuple_type tuple_type;

// The actual underlying signal object.
signal_type wrapped;
async_visitor(const tuple_type& args_, asio::io_service& asio_):
args(args_),
asio(asio_)
{ }

// Deferred signal arguments.
std::list<std::tuple<Args...>> mutable history;
std::mutex mutable mutex;
template<class Other>
result_type
operator()(const std::shared_ptr<io::basic_slot<Other>>& COCAINE_UNUSED_(slot)) const {
__builtin_unreachable();
}

public:
typedef typename signal_type::slot_type slot_type;
result_type
operator()(const std::shared_ptr<io::basic_slot<Event>>& slot) const {
auto args = this->args;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if args aren't copyable?
Okay, I see it doesn't make sense to use move-only args.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, history won't work in that case.


auto
connect(const slot_type& slot) -> signals::connection {
try {
std::lock_guard<std::mutex> guard(mutex);
asio.post([slot, args]() mutable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn it break something if you call asio.dispatch(...) instead of asio.post(...)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to make sure that signals are always executed from the event loop, never from the signal caller's context (which might lead to some weird errors).

(*slot)(std::move(args), upstream<void>());
});
}

std::for_each(history.begin(), history.end(), [&slot](const std::tuple<Args...>& args) {
slot(std::get<Indices>(args)...);
});
} catch(const signals::expired_slot& e) {
return signals::connection();
}
const tuple_type& args;
asio::io_service& asio;
};

return wrapped.connect(slot);
template<class Tag>
struct event_visitor:
public boost::static_visitor<void>
{
event_visitor(const std::shared_ptr<dispatch<Tag>>& slot_, asio::io_service& asio_):
slot(slot_),
asio(asio_)
{ }

template<class Event>
result_type
operator()(const io::frozen<Event>& event) const {
try {
slot->process(io::event_traits<Event>::id, async_visitor<Event>(event.tuple, asio));
} catch(const cocaine::error_t& e) {
// Ignore.
}
}

private:
const std::shared_ptr<dispatch<Tag>>& slot;
asio::io_service& asio;
};

} // namespace aux

template<class Tag>
class retroactive_signal {
typedef typename io::make_frozen_over<Tag>::type variant_type;

struct subscriber_t {
std::weak_ptr<dispatch<Tag>> slot;
asio::io_service& asio;
};

// Separately synchronized to keep the boost::signals2 guarantees.
synchronized<std::list<variant_type>> history;
synchronized<std::list<subscriber_t>> subscribers;

public:
void
operator()(Args&&... args) {
{
std::lock_guard<std::mutex> guard(mutex);
history.emplace_back(std::forward<Args>(args)...);
}
listen(const std::shared_ptr<dispatch<Tag>>& slot, asio::io_service& asio) {
subscribers->push_back(subscriber_t{slot, asio});

auto ptr = history.synchronize();

wrapped(std::forward<Args>(args)...);
std::for_each(ptr->begin(), ptr->end(), [&](const variant_type& event) {
boost::apply_visitor(aux::event_visitor<Tag>(slot, asio), event);
});
}

template<class Event, typename... Args>
void
operator()(Args&&... args) const {
{
std::lock_guard<std::mutex> guard(mutex);
history.emplace_back(std::forward<Args>(args)...);
invoke(Args&&... args) {
auto ptr = subscribers.synchronize();

for(auto it = ptr->begin(); it != ptr->end();) {
auto slot = it->slot.lock();

if(!slot) {
it = ptr->erase(it); continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oneliner!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oneliners are cool!

}

try {
slot->process(io::event_traits<Event>::id, aux::async_visitor<Event>(
typename io::basic_slot<Event>::tuple_type(std::forward<Args>(args)...),
it->asio
));
} catch(const cocaine::error_t& e) {
// Ignore.
}

++it;
}

wrapped(std::forward<Args>(args)...);
history->emplace_back(io::make_frozen<Event>(std::forward<Args>(args)...));
}
};

Expand Down
2 changes: 0 additions & 2 deletions include/cocaine/detail/actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

#include "cocaine/common.hpp"

#include <list>

#include <asio/io_service.hpp>
#include <asio/ip/tcp.hpp>

Expand Down
8 changes: 6 additions & 2 deletions include/cocaine/detail/service/locator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@

#include "cocaine/detail/service/locator/routing.hpp"

#include "cocaine/idl/context.hpp"
#include "cocaine/idl/locator.hpp"

#include "cocaine/rpc/dispatch.hpp"

#include "cocaine/locked_ptr.hpp"
Expand Down Expand Up @@ -69,7 +71,6 @@ class locator_t:
public dispatch<io::locator_tag>
{
class connect_client_t;
class cleanup_action_t;

context_t& m_context;

Expand Down Expand Up @@ -101,6 +102,9 @@ class locator_t:
// Used to resolve service names against routing groups, based on weights and other metrics.
std::map<std::string, continuum_t> m_groups;

// Slot for context signals.
std::shared_ptr<dispatch<io::context_tag>> m_signals;

public:
locator_t(context_t& context, asio::io_service& asio, const std::string& name, const dynamic_t& args);

Expand Down Expand Up @@ -147,7 +151,7 @@ class locator_t:
// Context signals

void
on_service(const actor_t& actor);
on_service(const std::string& name, const results::resolve& meta, bool active);

void
on_context_shutdown();
Expand Down
63 changes: 0 additions & 63 deletions include/cocaine/detail/waitable.hpp

This file was deleted.

1 change: 0 additions & 1 deletion include/cocaine/dynamic/dynamic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "cocaine/utility.hpp"

#include <string>
#include <utility>
#include <vector>

#include <boost/lexical_cast.hpp>
Expand Down
Loading