diff --git a/include/cocaine/api/connect.hpp b/include/cocaine/api/connect.hpp index dbf9999b1..4b2f53baf 100644 --- a/include/cocaine/api/connect.hpp +++ b/include/cocaine/api/connect.hpp @@ -35,8 +35,6 @@ template class client; namespace details { class basic_client_t { - // Even though it's a shared pointer, clients do not share session ownership. The reason behind - // this is to avoid multiple clients being notified on session shutdown and trying to detach it. std::shared_ptr m_session; public: @@ -54,16 +52,12 @@ class basic_client_t { // Observers auto - session() const -> boost::optional; - - virtual - int - version() const = 0; + remote_endpoint() const -> asio::ip::tcp::endpoint; // Modifiers void - connect(std::unique_ptr log, std::unique_ptr socket); + attach(std::unique_ptr log, std::unique_ptr socket); }; } // namespace details @@ -72,8 +66,11 @@ template class client: public details::basic_client_t { + template::value> + struct traits; + template - struct traits { + struct traits { typedef upstream::dispatch_type> upstream_type; typedef dispatch::upstream_type> const dispatch_type; }; @@ -82,11 +79,6 @@ class client: template typename traits::upstream_type invoke(const std::shared_ptr::dispatch_type>& dispatch, Args&&... args) { - static_assert( - std::is_same::value, - "message protocol is not compatible with this client" - ); - if(!m_session) { throw cocaine::error_t("client is not connected"); } @@ -95,21 +87,13 @@ class client: throw cocaine::error_t("callee has no upstreams specified"); } - // Get an untagged upstream. The message will be send directly using this upstream avoiding - // duplicate static validations in upstream, because it's a little bit faster this way. - const io::upstream_ptr_t ptr = m_session->inject(dispatch); + const auto ptr = m_session->inject(dispatch); - // TODO: Locking? + // NOTE: No locking required: session synchronizes channels, hence no races. ptr->template send(std::forward(args)...); return ptr; } - - virtual - int - version() const { - return io::protocol::version::value; - } }; }} // namespace cocaine::api diff --git a/include/cocaine/api/resolve.hpp b/include/cocaine/api/resolve.hpp deleted file mode 100644 index c00a67a1c..000000000 --- a/include/cocaine/api/resolve.hpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - Copyright (c) 2013-2014 Andrey Goryachev - Copyright (c) 2014 Andrey Sibiryov - Copyright (c) 2011-2014 Other contributors as noted in the AUTHORS file. - - This file is part of Cocaine. - - Cocaine is free software; you can redistribute it and/or modify - it under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Cocaine is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef COCAINE_RESOLVE_HPP -#define COCAINE_RESOLVE_HPP - -#include "cocaine/api/connect.hpp" -#include "cocaine/idl/locator.hpp" - -namespace cocaine { namespace api { - -class resolve_t { - COCAINE_DECLARE_NONCOPYABLE(resolve_t) - - class resolve_action_t; - class connect_action_t; - - typedef asio::ip::tcp::endpoint endpoint_type; - typedef std::function handler_type; - - static const std::vector kDefaultEndpoints; - - const std::unique_ptr m_log; - - asio::io_service& m_asio; - client m_locator; - - struct pending_request_t { - std::shared_ptr dispatch; - std::string name; - }; - - std::deque m_pending; - -public: - resolve_t(std::unique_ptr log, asio::io_service& asio, - const std::vector& endpoints = kDefaultEndpoints); - - void - resolve(details::basic_client_t& client, const std::string& name, handler_type handle); - - void - connect(details::basic_client_t& client, const std::vector& endpoints, - handler_type handle); - -private: - void - resolve_pending(const std::error_code& ec); -}; - -}} // namespace cocaine::api - -namespace cocaine { namespace error { - -enum resolve_errors { - version_mismatch = 1 -}; - -auto -make_error_code(resolve_errors code) -> std::error_code; - -}} // namespace cocaine::error - -namespace std { - -template<> -struct is_error_code_enum: - public true_type -{ }; - -} // namespace std - -#endif diff --git a/include/cocaine/detail/service/locator.hpp b/include/cocaine/detail/service/locator.hpp index edc3502d0..a93a98d93 100644 --- a/include/cocaine/detail/service/locator.hpp +++ b/include/cocaine/detail/service/locator.hpp @@ -23,7 +23,7 @@ #define COCAINE_LOCATOR_SERVICE_HPP #include "cocaine/api/cluster.hpp" -#include "cocaine/api/resolve.hpp" +#include "cocaine/api/connect.hpp" #include "cocaine/api/service.hpp" #include "cocaine/detail/service/locator/routing.hpp" @@ -70,7 +70,12 @@ class locator_t: public api::cluster_t::interface, public dispatch { - class connect_client_t; + class remote_t; + + typedef std::map router_map_t; + + typedef std::map> remote_map_t; + typedef std::map> stream_map_t; context_t& m_context; @@ -80,30 +85,25 @@ class locator_t: // Cluster interconnections. asio::io_service& m_asio; - // Remote sessions are created using this resolve. - // TODO: Since we don't resolve any remote services, drop this as a permanent member. - std::shared_ptr m_resolve; - - // Incoming sessions indexed by uuid. It is required to disambiguate between multiple different - // instances on the same host, even if the instance was restarted on the same port. - std::map> m_remotes; - - // Outgoing sessions indexed by uuid. - std::map> m_streams; - std::mutex m_mutex; - - // Snapshot of the local service disposition. - std::map m_snapshot; + // Slot for context signals. + std::shared_ptr> m_signals; // Clustering components. std::unique_ptr m_gateway; std::shared_ptr m_cluster; // Used to resolve service names against routing groups, based on weights and other metrics. - std::map m_groups; + synchronized m_routers; - // Slot for context signals. - std::shared_ptr> m_signals; + // Incoming sessions indexed by uuid. It is required to disambiguate between multiple different + // instances on the same host, even if the instance was restarted on the same port. + synchronized m_remotes; + + // Outgoing sessions indexed by uuid. + synchronized m_streams; + + // Snapshot of the local service disposition. Synchronized with outgoing streams. + std::map m_snapshot; public: locator_t(context_t& context, asio::io_service& asio, const std::string& name, const dynamic_t& args); diff --git a/src/api.cpp b/src/api.cpp index ad5dc1e98..7eeeef7ba 100644 --- a/src/api.cpp +++ b/src/api.cpp @@ -18,35 +18,20 @@ along with this program. If not, see . */ -#include "cocaine/context.hpp" -#include "cocaine/logging.hpp" - #include "cocaine/api/connect.hpp" -#include "cocaine/api/resolve.hpp" #include "cocaine/api/storage.hpp" -#include "cocaine/rpc/asio/channel.hpp" - -#include "cocaine/traits/endpoint.hpp" -#include "cocaine/traits/graph.hpp" -#include "cocaine/traits/vector.hpp" +#include "cocaine/context.hpp" -#include -#include -#include -#include +#include "cocaine/rpc/asio/channel.hpp" -#include +using namespace cocaine; +using namespace cocaine::api::details; -using namespace asio; using namespace asio::ip; -using namespace blackhole; - // Connect -namespace cocaine { namespace api { namespace details { - basic_client_t::basic_client_t(basic_client_t&& other) { *this = std::move(other); } @@ -70,13 +55,17 @@ basic_client_t::operator=(basic_client_t&& rhs) { return *this; } -boost::optional -basic_client_t::session() const { - return boost::optional(m_session != nullptr, *m_session); +auto +basic_client_t::remote_endpoint() const -> tcp::endpoint { + if(!m_session) { + return tcp::endpoint(); + } else { + return m_session->remote_endpoint(); + } } void -basic_client_t::connect(std::unique_ptr log, std::unique_ptr socket) { +basic_client_t::attach(std::unique_ptr log, std::unique_ptr socket) { if(m_session) { throw cocaine::error_t("client is already connected"); } @@ -90,239 +79,6 @@ basic_client_t::connect(std::unique_ptr log, std::unique_ptrpull(); } -}}} // namespace cocaine::api::details - -using namespace cocaine::api; -using namespace cocaine::api::details; - -// Resolve internals - -class resolve_t::resolve_action_t: - public dispatch::upstream_type> -{ - resolve_t *const parent; - basic_client_t& client; - - // User-supplied completion handler. - handler_type handle; - -public: - resolve_action_t(resolve_t *const parent_, basic_client_t& client_, handler_type handle_): - dispatch::upstream_type>("resolve"), - parent(parent_), - client(client_), - handle(handle_) - { - typedef io::protocol::upstream_type>::scope protocol; - - using namespace std::placeholders; - - on(std::bind(&resolve_action_t::on_value, this, _1, _2, _3)); - on(std::bind(&resolve_action_t::on_error, this, _1, _2)); - } - - virtual - void - discard(const std::error_code& ec) const { - parent->m_asio.post(std::bind(handle, ec)); - } - -private: - void - on_value(const std::vector& endpoints, int version, const io::graph_root_t&) { - if(version != client.version()) { - parent->m_asio.post(std::bind(handle, error::version_mismatch)); - return; - } - - parent->connect(client, endpoints, handle); - } - - void - on_error(int code, const std::string& COCAINE_UNUSED_(reason)) { - parent->m_asio.post(std::bind(handle, static_cast(code))); - } -}; - -class resolve_t::connect_action_t: - public std::enable_shared_from_this -{ - typedef std::vector::const_iterator iterator_type; - - resolve_t *const parent; - basic_client_t& client; - - // Copied to keep the finalize() iterator valid. - std::vector endpoints; - - // User-supplied completion handler. - handler_type handle; - - // Used to bootstrap the client. - std::unique_ptr socket; - -public: - connect_action_t(resolve_t *const parent_, basic_client_t& client_, - const std::vector& endpoints_, handler_type handle_) - : - parent(parent_), - client(client_), - handle(handle_) - { - endpoints.assign(endpoints_.begin(), endpoints_.end()); - - // Will be properly disposed of on unsuccessful connection attempt. - socket = std::make_unique(parent->m_asio); - } - - void - operator()() { - async_connect(*socket, endpoints.begin(), endpoints.end(), std::bind(&connect_action_t::finalize, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2 - )); - } - -private: - void - finalize(const std::error_code& ec, iterator_type endpoint) { - if(!ec) { - auto client_log = std::make_unique(*parent->m_log, attribute::set_t({ - attribute::make("endpoint", boost::lexical_cast(*endpoint)) - })); - - try { - client.connect(std::move(client_log), std::move(socket)); - } catch(const std::system_error& e) { - // The socket might already be disconnected by this time. - parent->m_asio.post(std::bind(handle, e.code())); - return; - } - } else { - socket = nullptr; - } - - parent->m_asio.post(std::bind(handle, ec)); - } -}; - -// Resolve - -const std::vector resolve_t::kDefaultEndpoints = { - { address::from_string("127.0.0.1"), 10053 } -}; - -resolve_t::resolve_t(std::unique_ptr log, io_service& asio, - const std::vector& endpoints) -: - m_log(std::move(log)), - m_asio(asio) -{ - if(endpoints.empty()) { - return; - } - - std::ostringstream stream; - std::ostream_iterator builder(stream); - - boost::spirit::karma::generate(builder, boost::spirit::karma::stream % ", ", endpoints); - - COCAINE_LOG_DEBUG(m_log, "connecting to remote locator, trying: %s", stream.str()); - - connect(m_locator, endpoints, std::bind(&resolve_t::resolve_pending, - this, - std::placeholders::_1 - )); -} - -void -resolve_t::resolve(basic_client_t& client, const std::string& name, handler_type handle) { - auto dispatch = std::make_shared(this, client, handle); - - if(!m_locator.session()) { - m_pending.push_back({dispatch, name}); - } else { - m_locator.invoke(dispatch, name); - } -} - -void -resolve_t::connect(basic_client_t& client, const std::vector& endpoints, - handler_type handle) -{ - m_asio.dispatch(std::bind(&connect_action_t::operator(), - std::make_shared(this, client, endpoints, handle) - )); -} - -void -resolve_t::resolve_pending(const std::error_code& ec) { - if(ec) { - COCAINE_LOG_ERROR(m_log, "unable to connect to remote locator - [%d] %s", - ec.value(), ec.message() - ); - - for(auto it = m_pending.begin(); it != m_pending.end(); ++it) { - it->dispatch->discard(ec); - } - - m_pending.clear(); - } - - if(!m_pending.empty()) { - COCAINE_LOG_DEBUG(m_log, "resolving %d pending service(s)", m_pending.size()); - - for(auto it = m_pending.begin(); it != m_pending.end(); ++it) { - m_locator.invoke(it->dispatch, it->name); - } - - m_pending.clear(); - } -} - -namespace { - -// Resolve errors - -struct resolve_category_t: - public std::error_category -{ - virtual - auto - name() const throw() -> const char* { - return "cocaine.api.resolve"; - } - - virtual - auto - message(int code) const -> std::string { - switch(code) { - case cocaine::error::resolve_errors::version_mismatch: - return "service protocol version is not compatible with the client"; - } - - return "cocaine.api.resolve error"; - } -}; - -auto -resolve_category() -> const std::error_category& { - static resolve_category_t instance; - return instance; -} - -} // namespace - -namespace cocaine { namespace error { - -auto -make_error_code(resolve_errors code) -> std::error_code { - return std::error_code(static_cast(code), resolve_category()); -} - -}} // namespace cocaine::error - namespace cocaine { namespace api { // Storage diff --git a/src/service/locator.cpp b/src/service/locator.cpp index 83a5a3aac..4d43191fd 100644 --- a/src/service/locator.cpp +++ b/src/service/locator.cpp @@ -39,6 +39,8 @@ #include "cocaine/traits/map.hpp" #include "cocaine/traits/vector.hpp" +#include + #include #include @@ -58,9 +60,9 @@ using namespace cocaine::service; // Locator internals -class locator_t::connect_client_t: +class locator_t::remote_t: public dispatch::upstream_type>, - public std::enable_shared_from_this + public std::enable_shared_from_this { locator_t *const parent; std::string const uuid; @@ -69,8 +71,8 @@ class locator_t::connect_client_t: std::set active; public: - connect_client_t(locator_t *const parent_, const std::string& uuid_): - dispatch::upstream_type>(parent_->name()), + remote_t(locator_t *const parent_, const std::string& uuid_): + dispatch::upstream_type>(parent_->name() + ":remote"), parent(parent_), uuid(uuid_) { @@ -78,20 +80,17 @@ class locator_t::connect_client_t: using namespace std::placeholders; - on(std::bind(&connect_client_t::on_announce, this, _1, _2)); - on(std::bind(&connect_client_t::on_shutdown, this)); + on(std::bind(&remote_t::on_announce, this, _1, _2)); + on(std::bind(&remote_t::on_shutdown, this)); } virtual - ~connect_client_t() { + ~remote_t() { for(auto it = active.begin(); it != active.end(); ++it) { parent->m_gateway->cleanup(uuid, *it); } } - void - on_link(const std::error_code& ec); - virtual void discard(const std::error_code& ec) const; @@ -105,42 +104,10 @@ class locator_t::connect_client_t: }; void -locator_t::connect_client_t::on_link(const std::error_code& ec) { - scoped_attributes_t attributes(*parent->m_log, { - attribute::make("uuid", uuid) - }); - - if(ec) { - COCAINE_LOG_ERROR(parent->m_log, "unable to connect to remote node: [%d] %s", - ec.value(), ec.message() - ); - - // Safe to erase directly — client is detached. - parent->m_remotes.erase(uuid); - - return; - } - - if(!parent->m_remotes.count(uuid)) { - // Can happen if the cluter plugin decides to drop the remote node while the locator tries - // to connect to it. - COCAINE_LOG_ERROR(parent->m_log, "client has been dropped while connecting to remote node"); - return; - } - - auto& client = parent->m_remotes.at(uuid); - auto& session = client.session().get(); - - COCAINE_LOG_DEBUG(parent->m_log, "connected to remote node via %s", session.remote_endpoint()); - - client.invoke(shared_from_this(), parent->m_cfg.uuid); -} - -void -locator_t::connect_client_t::discard(const std::error_code& ec) const { +locator_t::remote_t::discard(const std::error_code& ec) const { if(ec.value() == 0) return; - COCAINE_LOG_ERROR(parent->m_log, "remote node has been discarded: [%d] %s", ec.value(), ec.message())( + COCAINE_LOG_ERROR(parent->m_log, "remote node discarded: [%d] %s", ec.value(), ec.message())( "uuid", uuid ); @@ -148,9 +115,12 @@ locator_t::connect_client_t::discard(const std::error_code& ec) const { } void -locator_t::connect_client_t::on_announce(const std::string& node, const std::map& update) { +locator_t::remote_t::on_announce(const std::string& node, + const std::map& update) +{ if(node != uuid) { - COCAINE_LOG_ERROR(parent->m_log, "remote node id mismatch: expected '%s', received '%s'", uuid, node); + COCAINE_LOG_ERROR(parent->m_log, "remote node id mismatch: '%s' vs. '%s'", uuid, node); + parent->drop_node(uuid); return; } @@ -163,7 +133,7 @@ locator_t::connect_client_t::on_announce(const std::string& node, const std::map if(endpoints.empty()) { parent->m_gateway->cleanup(uuid, it->first); - active.erase(it->first); + active.erase (it->first); } else { parent->m_gateway->consume(uuid, it->first, it->second); active.insert(it->first); @@ -179,14 +149,14 @@ locator_t::connect_client_t::on_announce(const std::string& node, const std::map update | boost::adaptors::map_keys ); - COCAINE_LOG_INFO(parent->m_log, "remote node has updated %d service(s): %s", update.size(), stream.str())( + COCAINE_LOG_INFO(parent->m_log, "remote node updated %d service(s): %s", update.size(), stream.str())( "uuid", uuid ); } void -locator_t::connect_client_t::on_shutdown() { - COCAINE_LOG_INFO(parent->m_log, "remote node has closed synchronization stream")( +locator_t::remote_t::on_shutdown() { + COCAINE_LOG_INFO(parent->m_log, "remote node closed synchronization stream")( "uuid", uuid ); @@ -209,8 +179,7 @@ locator_t::locator_t(context_t& context, io_service& asio, const std::string& na m_context(context), m_log(context.log(name)), m_cfg(name, root), - m_asio(asio), - m_resolve(new api::resolve_t(context.log(name + ":resolve"), asio, {})) + m_asio(asio) { using namespace std::placeholders; @@ -219,11 +188,6 @@ locator_t::locator_t(context_t& context, io_service& asio, const std::string& na on(std::bind(&locator_t::on_refresh, this, _1)); on(std::bind(&locator_t::on_cluster, this)); - // Context signals - - m_signals = std::make_shared>(name); - m_signals->on(std::bind(&locator_t::on_context_shutdown, this)); - // Service restrictions if(!m_cfg.restricted.empty()) { @@ -235,7 +199,12 @@ locator_t::locator_t(context_t& context, io_service& asio, const std::string& na COCAINE_LOG_INFO(m_log, "restricting %d service(s): %s", m_cfg.restricted.size(), stream.str()); } - // Initialize clustering components + // Context signals slot + + m_signals = std::make_shared>(name); + m_signals->on(std::bind(&locator_t::on_context_shutdown, this)); + + // Clustering components if(root.as_object().count("cluster")) { const auto conf = root.as_object().at("cluster").as_object(); @@ -285,7 +254,7 @@ locator_t::locator_t(context_t& context, io_service& asio, const std::string& na attribute::make("rg", *it) }); - m_groups.insert({ + m_routers.unsafe().insert({ *it, continuum_t(std::move(log), storage->get("groups", *it)) }); @@ -315,31 +284,60 @@ locator_t::asio() { void locator_t::link_node(const std::string& uuid, const std::vector& endpoints) { - if(!m_gateway || m_remotes.find(uuid) != m_remotes.end()) { + auto mapping = m_remotes.synchronize(); + + if(!m_gateway || mapping->count(uuid) != 0) { return; } - COCAINE_LOG_INFO(m_log, "starting synchronization with remote node")( + auto channel = std::make_shared(m_asio); + + asio::async_connect(*channel, endpoints.begin(), endpoints.end(), + [=](const std::error_code& ec, std::vector::const_iterator endpoint) + { + auto mapping = m_remotes.synchronize(); + + blackhole::scoped_attributes_t attributes(*m_log, { attribute::make("uuid", uuid) }); + + if(ec) { + COCAINE_LOG_ERROR(m_log, "unable to connect to a remote node: [%d] %s", + ec.value(), ec.message()); + return; + } else { + COCAINE_LOG_DEBUG(m_log, "connected to remote node via %s", *endpoint); + } + + auto& client = mapping->operator[](uuid); + + auto client_log = std::make_unique(*m_log, attribute::set_t({ + attribute::make("endpoint", boost::lexical_cast(*endpoint)) + })); + + client.attach(std::move(client_log), std::make_unique(std::move(*channel))); + client.invoke(std::make_shared(this, uuid), m_cfg.uuid); + }); + + COCAINE_LOG_INFO(m_log, "initiating link to remote node, %llu route(s)", endpoints.size())( "uuid", uuid ); - - m_resolve->connect(m_remotes[uuid], endpoints, std::bind(&connect_client_t::on_link, - std::make_shared(this, uuid), - std::placeholders::_1 - )); } void locator_t::drop_node(const std::string& uuid) { - if(!m_gateway || m_remotes.find(uuid) == m_remotes.end()) { - return; - } + m_remotes.apply([&](remote_map_t& mapping) { + if(!m_gateway || mapping.count(uuid) == 0) { + return; + } - COCAINE_LOG_INFO(m_log, "stopping synchronization with remote node")( - "uuid", uuid - ); + COCAINE_LOG_INFO(m_log, "stopping synchronization with remote node")( + "uuid", uuid + ); + + mapping.erase(uuid); + }); - m_remotes.erase(uuid); + // Chances are, this node is also connected back to us as a consumer. + m_streams->erase(uuid); } std::string @@ -349,19 +347,15 @@ locator_t::uuid() const { auto locator_t::on_resolve(const std::string& name, const std::string& seed) const -> results::resolve { - std::string remapped; - - if(m_groups.count(name)) { - remapped = seed.empty() ? m_groups.at(name).get() : m_groups.at(name).get(seed); - - COCAINE_LOG_DEBUG(m_log, "remapped service group '%s' to '%s'", name, remapped)( - "service", remapped - ); - } else { - remapped = name; - } + const auto remapped = m_routers.apply([&](const router_map_t& mapping) -> std::string { + if(!mapping.count(name)) { + return name; + } else { + return seed.empty() ? mapping.at(name).get() : mapping.at(name).get(seed); + } + }); - if(auto provided = m_context.locate(remapped)) { + if(const auto provided = m_context.locate(remapped)) { COCAINE_LOG_DEBUG(m_log, "providing service using local actor")( "service", remapped ); @@ -389,21 +383,19 @@ locator_t::on_connect(const std::string& uuid) -> streamed { return stream.close(); } - scoped_attributes_t attributes(*m_log, { - attribute::make("uuid", uuid) - }); + auto mapping = m_streams.synchronize(); - std::lock_guard guard(m_mutex); + scoped_attributes_t attributes(*m_log, { attribute::make("uuid", uuid) }); - if(m_streams.erase(uuid)) { + if(mapping->erase(uuid)) { COCAINE_LOG_WARNING(m_log, "replacing stale synchronization stream for remote node"); } else { COCAINE_LOG_INFO(m_log, "creating synchronization stream for remote node"); } - // Store the stream to synchronize future service updates with the remote node. Updates are sent - // out on context service signals, and, eventually, propagate to all nodes in the cluster. - m_streams.insert({uuid, stream}); + // Store the stream to synchronize future service updates with the remote node. Updates are + // sent out on context service signals, and propagate to all nodes in the cluster. + mapping->insert({uuid, stream}); if(m_snapshot.empty()) { return stream; @@ -432,12 +424,13 @@ locator_t::on_refresh(const std::vector& groups) { throw std::system_error(error::routing_storage_error); } + auto mapping = m_routers.synchronize(); + for(auto it = groups.begin(); it != groups.end(); ++it) { - // Group continuums can't be updated, only erased and constructed again. This simplifies the - // logic greatly and doesn't impose any performance penalty. - m_groups.erase(*it); + // Routing continuums can't be updated, only erased and reconstructed again. This simplifies + // the logic greatly and doesn't impose any significant performance penalty. + mapping->erase(*it); - // An extremely obscure way to save one function call! std::tie(lb, ub) = values.equal_range(*it); if(lb != ub) { @@ -445,19 +438,21 @@ locator_t::on_refresh(const std::vector& groups) { attribute::make("rg", *it) }); - m_groups.insert({*it, continuum_t(std::move(log), lb->second)}); + mapping->insert({*it, continuum_t(std::move(log), lb->second)}); } - } - COCAINE_LOG_INFO(m_log, "updated %d active group(s)", values.size()); + COCAINE_LOG_INFO(m_log, "%s routing group %s", lb != ub ? "updated" : "removed", *it); + }; } auto locator_t::on_cluster() const -> results::cluster { results::cluster result; - for(auto it = m_remotes.begin(); it != m_remotes.end(); ++it) { - result[it->first] = it->second.session().get().remote_endpoint(); + auto mapping = m_remotes.synchronize(); + + for(auto it = mapping->begin(); it != mapping->end(); ++it) { + result[it->first] = it->second.remote_endpoint(); } return result; @@ -469,26 +464,23 @@ locator_t::on_service(const std::string& name, const results::resolve& meta, boo return; } - const auto response = results::connect { - m_cfg.uuid, {{ name, meta }} - }; - - std::lock_guard guard(m_mutex); - - COCAINE_LOG_DEBUG(m_log, "synchronizing service state with %d remote node(s)", m_streams.size())( - "service", name - ); + auto mapping = m_streams.synchronize(); - for(auto it = m_streams.begin(); it != m_streams.end();) { - try { - it->second.write(response); ++it; - } catch(...) { - COCAINE_LOG_INFO(m_log, "removing synchronization stream for remote node")( - "uuid", it->first - ); + if(!mapping->empty()) { + const auto response = results::connect { m_cfg.uuid, {{ name, meta }} }; - it = m_streams.erase(it); + for(auto it = mapping->begin(); it != mapping->end();) { + try { + it->second.write(response); + it++; + } catch(...) { + it = mapping->erase(it); + } } + + COCAINE_LOG_DEBUG(m_log, "synchronized metadata with %llu remote nodes", mapping->size())( + "service", name + ); } if(active) { @@ -500,35 +492,38 @@ locator_t::on_service(const std::string& name, const results::resolve& meta, boo void locator_t::on_context_shutdown() { - std::lock_guard guard(m_mutex); + m_streams.apply([this](stream_map_t& mapping) { + if(mapping.empty()) { + return; + } else { + COCAINE_LOG_DEBUG(m_log, "cleaning up %d remote node stream(s)", mapping.size()); + } - COCAINE_LOG_DEBUG(m_log, "closing %d remote node synchronization stream(s)", m_streams.size()); + for(auto it = mapping.begin(); it != mapping.end();) { + try { + it->second.close(); + } catch(...) { + // Ignore all exceptions. The runtime is being destroyed anyway. + } - for(auto it = m_streams.begin(); it != m_streams.end();) { - try { - it->second.close(); - } catch(...) { - // Ignore all exceptions. The runtime is being destroyed anyway. + it = mapping.erase(it); } + }); - it = m_streams.erase(it); - } - - COCAINE_LOG_DEBUG(m_log, "cleaning up %d remote node client(s)", m_remotes.size()); + m_remotes.apply([this](remote_map_t& mapping) { + if(mapping.empty()) { + return; + } else { + COCAINE_LOG_DEBUG(m_log, "cleaning up %d remote node client(s)", mapping.size()); + } - // Disconnect all the remote nodes. - m_remotes.clear(); + // Disconnect all the remote nodes. + mapping.clear(); + }); COCAINE_LOG_DEBUG(m_log, "shutting down distributed components"); - // Destroy the clustering stuff. - m_gateway = nullptr; m_cluster = nullptr; - - // Destroy the loopback locator connection. - m_resolve = nullptr; - - // Disconnect the signals. m_signals = nullptr; }