Skip to content

Commit

Permalink
[Locator] Thread safety.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Sibiryov committed Mar 31, 2015
1 parent 1f84a12 commit 1ae6756
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 529 deletions.
32 changes: 8 additions & 24 deletions include/cocaine/api/connect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ template<class Tag> class client;
namespace details {

class basic_client_t {
// Even though it's a shared pointer, clients do not share session ownership. The reason behind
// this is to avoid multiple clients being notified on session shutdown and trying to detach it.
std::shared_ptr<session_t> m_session;

public:
Expand All @@ -54,16 +52,12 @@ class basic_client_t {
// Observers

auto
session() const -> boost::optional<const session_t&>;

virtual
int
version() const = 0;
remote_endpoint() const -> asio::ip::tcp::endpoint;

// Modifiers

void
connect(std::unique_ptr<logging::log_t> log, std::unique_ptr<asio::ip::tcp::socket> socket);
attach(std::unique_ptr<logging::log_t> log, std::unique_ptr<asio::ip::tcp::socket> socket);
};

} // namespace details
Expand All @@ -72,8 +66,11 @@ template<class Tag>
class client:
public details::basic_client_t
{
template<class Event, bool = std::is_same<typename Event::tag, Tag>::value>
struct traits;

template<class Event>
struct traits {
struct traits<Event, true> {
typedef upstream<typename io::event_traits<Event>::dispatch_type> upstream_type;
typedef dispatch<typename io::event_traits<Event>::upstream_type> const dispatch_type;
};
Expand All @@ -82,11 +79,6 @@ class client:
template<class Event, typename... Args>
typename traits<Event>::upstream_type
invoke(const std::shared_ptr<typename traits<Event>::dispatch_type>& dispatch, Args&&... args) {
static_assert(
std::is_same<typename Event::tag, Tag>::value,
"message protocol is not compatible with this client"
);

if(!m_session) {
throw cocaine::error_t("client is not connected");
}
Expand All @@ -95,21 +87,13 @@ class client:
throw cocaine::error_t("callee has no upstreams specified");
}

// Get an untagged upstream. The message will be send directly using this upstream avoiding
// duplicate static validations in upstream<Tag>, because it's a little bit faster this way.
const io::upstream_ptr_t ptr = m_session->inject(dispatch);
const auto ptr = m_session->inject(dispatch);

// TODO: Locking?
// NOTE: No locking required: session synchronizes channels, hence no races.
ptr->template send<Event>(std::forward<Args>(args)...);

return ptr;
}

virtual
int
version() const {
return io::protocol<Tag>::version::value;
}
};

}} // namespace cocaine::api
Expand Down
91 changes: 0 additions & 91 deletions include/cocaine/api/resolve.hpp

This file was deleted.

38 changes: 19 additions & 19 deletions include/cocaine/detail/service/locator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#define COCAINE_LOCATOR_SERVICE_HPP

#include "cocaine/api/cluster.hpp"
#include "cocaine/api/resolve.hpp"
#include "cocaine/api/connect.hpp"
#include "cocaine/api/service.hpp"

#include "cocaine/detail/service/locator/routing.hpp"
Expand Down Expand Up @@ -70,7 +70,12 @@ class locator_t:
public api::cluster_t::interface,
public dispatch<io::locator_tag>
{
class connect_client_t;
class remote_t;

typedef std::map<std::string, continuum_t> router_map_t;

typedef std::map<std::string, api::client<io::locator_tag>> remote_map_t;
typedef std::map<std::string, streamed<results::connect>> stream_map_t;

context_t& m_context;

Expand All @@ -80,30 +85,25 @@ class locator_t:
// Cluster interconnections.
asio::io_service& m_asio;

// Remote sessions are created using this resolve.
// TODO: Since we don't resolve any remote services, drop this as a permanent member.
std::shared_ptr<api::resolve_t> m_resolve;

// Incoming sessions indexed by uuid. It is required to disambiguate between multiple different
// instances on the same host, even if the instance was restarted on the same port.
std::map<std::string, api::client<io::locator_tag>> m_remotes;

// Outgoing sessions indexed by uuid.
std::map<std::string, streamed<results::connect>> m_streams;
std::mutex m_mutex;

// Snapshot of the local service disposition.
std::map<std::string, results::resolve> m_snapshot;
// Slot for context signals.
std::shared_ptr<dispatch<io::context_tag>> m_signals;

// Clustering components.
std::unique_ptr<api::gateway_t> m_gateway;
std::shared_ptr<api::cluster_t> m_cluster;

// Used to resolve service names against routing groups, based on weights and other metrics.
std::map<std::string, continuum_t> m_groups;
synchronized<router_map_t> m_routers;

// Slot for context signals.
std::shared_ptr<dispatch<io::context_tag>> m_signals;
// Incoming sessions indexed by uuid. It is required to disambiguate between multiple different
// instances on the same host, even if the instance was restarted on the same port.
synchronized<remote_map_t> m_remotes;

// Outgoing sessions indexed by uuid.
synchronized<stream_map_t> m_streams;

// Snapshot of the local service disposition. Synchronized with outgoing streams.
std::map<std::string, results::resolve> m_snapshot;

public:
locator_t(context_t& context, asio::io_service& asio, const std::string& name, const dynamic_t& args);
Expand Down
Loading

0 comments on commit 1ae6756

Please sign in to comment.