diff --git a/lib/inc/Channel.h b/lib/inc/Channel.h new file mode 100644 index 000000000000..26fc2f5eca1b --- /dev/null +++ b/lib/inc/Channel.h @@ -0,0 +1,76 @@ +#pragma once + +#include "swss/producertable.h" +#include "swss/consumertable.h" +#include "swss/notificationconsumer.h" +#include "swss/selectableevent.h" +#include "swss/sal.h" + +extern "C" { +#include "sai.h" +} + +#include +#include + +namespace sairedis +{ + class Channel + { + public: + + typedef std::function&)> Callback; + + public: + + Channel( + _In_ Callback callback); + + virtual ~Channel(); + + public: + + virtual void setBuffered( + _In_ bool buffered) = 0; + + virtual void flush() = 0; + + virtual void set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& command) = 0; + + virtual void del( + _In_ const std::string& key, + _In_ const std::string& command) = 0; + + virtual sai_status_t wait( + _In_ const std::string& command, + _Out_ swss::KeyOpFieldsValuesTuple& kco) = 0; + + protected: + + virtual void notificationThreadFunction() = 0; + + protected: + + Callback m_callback; + + protected: // notification + + /** + * @brief Indicates whether notification thread should be running. + */ + volatile bool m_runNotificationThread; + + /** + * @brief Event used to nice end notifications thread. + */ + swss::SelectableEvent m_notificationThreadShouldEndEvent; + + /** + * @brief Notification thread + */ + std::shared_ptr m_notificationThread; + }; +} diff --git a/lib/inc/ContextConfig.h b/lib/inc/ContextConfig.h index 9d65ffac3f60..157940222925 100644 --- a/lib/inc/ContextConfig.h +++ b/lib/inc/ContextConfig.h @@ -38,6 +38,12 @@ namespace sairedis std::string m_dbState; + bool m_zmqEnable; + + std::string m_zmqEndpoint; + + std::string m_zmqNtfEndpoint; + std::shared_ptr m_scc; }; } diff --git a/lib/inc/RedisChannel.h b/lib/inc/RedisChannel.h index 1e84173c5b5a..05d628cc889a 100644 --- a/lib/inc/RedisChannel.h +++ b/lib/inc/RedisChannel.h @@ -1,5 +1,6 @@ #pragma once +#include "Channel.h" #include "RemoteSaiInterface.h" #include "SwitchContainer.h" #include "VirtualObjectIdManager.h" @@ -17,17 +18,14 @@ namespace sairedis { - class RedisChannel + class RedisChannel: + public Channel { - public: - - typedef std::function&)> Callback; - public: RedisChannel( _In_ const std::string& dbAsic, - _In_ Callback callback); + _In_ Channel::Callback callback); virtual ~RedisChannel(); @@ -35,31 +33,27 @@ namespace sairedis std::shared_ptr getDbConnector() const; - void setBuffered( - _In_ bool buffered); + virtual void setBuffered( + _In_ bool buffered) override; - void flush(); + virtual void flush() override; - void set( + virtual void set( _In_ const std::string& key, _In_ const std::vector& values, - _In_ const std::string& command); + _In_ const std::string& command) override; - void del( + virtual void del( _In_ const std::string& key, - _In_ const std::string& command); + _In_ const std::string& command) override; - sai_status_t wait( + virtual sai_status_t wait( _In_ const std::string& command, - _Out_ swss::KeyOpFieldsValuesTuple& kco); - - private: - - void notificationThreadFunction(); + _Out_ swss::KeyOpFieldsValuesTuple& kco) override; - private: + protected: - Callback m_callback; + virtual void notificationThreadFunction() override; private: @@ -85,11 +79,6 @@ namespace sairedis private: // notification - /** - * @brief Indicates whether notification thread should be running. - */ - volatile bool m_runNotificationThread; - /** * @brief Database connector used for notifications. */ @@ -99,15 +88,5 @@ namespace sairedis * @brief Notification consumer. */ std::shared_ptr m_notificationConsumer; - - /** - * @brief Event used to nice end notifications thread. - */ - swss::SelectableEvent m_notificationThreadShouldEndEvent; - - /** - * @brief Notification thread - */ - std::shared_ptr m_notificationThread; }; } diff --git a/lib/inc/RedisRemoteSaiInterface.h b/lib/inc/RedisRemoteSaiInterface.h index 12d624ae27c1..ba64c5dfb8de 100644 --- a/lib/inc/RedisRemoteSaiInterface.h +++ b/lib/inc/RedisRemoteSaiInterface.h @@ -9,6 +9,7 @@ #include "SkipRecordAttrContainer.h" #include "RedisChannel.h" #include "SwitchConfigContainer.h" +#include "ContextConfig.h" #include "swss/producertable.h" #include "swss/consumertable.h" @@ -72,9 +73,7 @@ namespace sairedis public: RedisRemoteSaiInterface( - _In_ uint32_t globalContext, - _In_ std::shared_ptr scc, - _In_ const std::string& dbAsic, + _In_ std::shared_ptr contextConfig, _In_ std::function)> notificationCallback, _In_ std::shared_ptr recorder); @@ -438,9 +437,7 @@ namespace sairedis private: - uint32_t m_globalContext; - - std::shared_ptr m_switchConfigContainer; + std::shared_ptr m_contextConfig; bool m_asicInitViewMode; @@ -456,18 +453,18 @@ namespace sairedis std::shared_ptr m_virtualObjectIdManager; + std::shared_ptr m_db; + std::shared_ptr m_redisVidIndexGenerator; std::weak_ptr m_meta; std::shared_ptr m_skipRecordAttrContainer; - std::shared_ptr m_redisChannel; + std::shared_ptr m_communicationChannel; std::function)> m_notificationCallback; - std::string m_dbAsic; - std::map m_tableDump; }; } diff --git a/lib/inc/ZeroMQChannel.h b/lib/inc/ZeroMQChannel.h new file mode 100644 index 000000000000..28ef92885c4e --- /dev/null +++ b/lib/inc/ZeroMQChannel.h @@ -0,0 +1,68 @@ +#pragma once + +#include "Channel.h" + +#include "swss/producertable.h" +#include "swss/consumertable.h" +#include "swss/notificationconsumer.h" +#include "swss/selectableevent.h" + +#include +#include + +namespace sairedis +{ + class ZeroMQChannel: + public Channel + { + public: + + ZeroMQChannel( + _In_ const std::string& endpoint, + _In_ const std::string& ntfEndpoint, + _In_ Channel::Callback callback); + + virtual ~ZeroMQChannel(); + + public: + + virtual void setBuffered( + _In_ bool buffered) override; + + virtual void flush() override; + + virtual void set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& command) override; + + virtual void del( + _In_ const std::string& key, + _In_ const std::string& command) override; + + virtual sai_status_t wait( + _In_ const std::string& command, + _Out_ swss::KeyOpFieldsValuesTuple& kco) override; + + protected: + + virtual void notificationThreadFunction() override; + + private: + + std::string m_endpoint; + + std::string m_ntfEndpoint; + + std::vector m_buffer; + + void* m_context; + + void* m_socket; + + void* m_ntfContext; + + void* m_ntfSocket; + + }; +} diff --git a/lib/src/Channel.cpp b/lib/src/Channel.cpp new file mode 100644 index 000000000000..50b1e870f4fa --- /dev/null +++ b/lib/src/Channel.cpp @@ -0,0 +1,21 @@ +#include "Channel.h" + +#include "swss/logger.h" + +using namespace sairedis; + +Channel::Channel( + _In_ Callback callback): + m_callback(callback) +{ + SWSS_LOG_ENTER(); + + // empty +} + +Channel::~Channel() +{ + SWSS_LOG_ENTER(); + + // empty +} diff --git a/lib/src/Context.cpp b/lib/src/Context.cpp index 3e6715671dd0..19f1843ab065 100644 --- a/lib/src/Context.cpp +++ b/lib/src/Context.cpp @@ -17,9 +17,7 @@ Context::Context( // will create notification thread m_redisSai = std::make_shared( - m_contextConfig->m_guid, - m_contextConfig->m_scc, - m_contextConfig->m_dbAsic, + m_contextConfig, std::bind(&Context::handle_notification, this, _1), m_recorder); diff --git a/lib/src/ContextConfig.cpp b/lib/src/ContextConfig.cpp index 43a2c7b9e7af..6817bfe24c8e 100644 --- a/lib/src/ContextConfig.cpp +++ b/lib/src/ContextConfig.cpp @@ -16,7 +16,8 @@ ContextConfig::ContextConfig( m_dbAsic(dbAsic), m_dbCounters(dbCounters), m_dbFlex(dbFlex), - m_dbState(dbState) + m_dbState(dbState), + m_zmqEnable(false) { SWSS_LOG_ENTER(); diff --git a/lib/src/ContextConfigContainer.cpp b/lib/src/ContextConfigContainer.cpp index 40ef05acc357..551162381307 100644 --- a/lib/src/ContextConfigContainer.cpp +++ b/lib/src/ContextConfigContainer.cpp @@ -35,9 +35,9 @@ std::shared_ptr ContextConfigContainer::getDefault() auto sc = std::make_shared(0, ""); cc->insert(sc); - + ccc->m_map[0] = cc; - + return ccc; } @@ -113,6 +113,15 @@ std::shared_ptr ContextConfigContainer::loadFromFile( auto cc = std::make_shared(guid, name, dbAsic, dbCounters, dbFlex, dbState); + cc->m_zmqEnable = item["zmq_enable"]; + cc->m_zmqEndpoint = item["zmq_endpoint"]; + cc->m_zmqNtfEndpoint = item["zmq_ntf_endpoint"]; + + SWSS_LOG_NOTICE("contextConfig zmq enable %s, endpoint: %s, ntf endpoint: %s", + (cc->m_zmqEnable) ? "true" : "false", + cc->m_zmqEndpoint.c_str(), + cc->m_zmqNtfEndpoint.c_str()); + for (size_t k = 0; k < item["switches"].size(); k++) { json& sw = item["switches"][k]; diff --git a/lib/src/Makefile.am b/lib/src/Makefile.am index f6a002c13c28..eb84b7244130 100644 --- a/lib/src/Makefile.am +++ b/lib/src/Makefile.am @@ -10,6 +10,8 @@ lib_LTLIBRARIES = libsairedis.la noinst_LIBRARIES = libSaiRedis.a libSaiRedis_a_SOURCES = \ + ZeroMQChannel.cpp \ + Channel.cpp \ Context.cpp \ ContextConfigContainer.cpp \ ContextConfig.cpp \ @@ -92,7 +94,7 @@ bin_PROGRAMS = tests tests_SOURCES = tests.cpp tests_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) -tests_LDADD = -lhiredis -lswsscommon -lpthread $(top_srcdir)/meta/libsaimetadata.la $(top_srcdir)/meta/libsaimeta.la libsairedis.la +tests_LDADD = -lhiredis -lswsscommon -lpthread $(top_srcdir)/meta/libsaimetadata.la $(top_srcdir)/meta/libsaimeta.la libsairedis.la -lzmq TESTS = tests diff --git a/lib/src/RedisChannel.cpp b/lib/src/RedisChannel.cpp index ac7a3baea68d..99d33c6e53d3 100644 --- a/lib/src/RedisChannel.cpp +++ b/lib/src/RedisChannel.cpp @@ -16,8 +16,8 @@ using namespace sairedis; RedisChannel::RedisChannel( _In_ const std::string& dbAsic, - _In_ Callback callback): - m_callback(callback), + _In_ Channel::Callback callback): + Channel(callback), m_dbAsic(dbAsic) { SWSS_LOG_ENTER(); @@ -48,7 +48,11 @@ RedisChannel::~RedisChannel() // notify thread that it should end m_notificationThreadShouldEndEvent.notify(); + SWSS_LOG_NOTICE("join ntf thread begin"); + m_notificationThread->join(); + + SWSS_LOG_NOTICE("join ntf thread end"); } std::shared_ptr RedisChannel::getDbConnector() const @@ -210,4 +214,3 @@ sai_status_t RedisChannel::wait( return SAI_STATUS_FAILURE; } - diff --git a/lib/src/RedisRemoteSaiInterface.cpp b/lib/src/RedisRemoteSaiInterface.cpp index ec045b54fdfc..429b803dc8bc 100644 --- a/lib/src/RedisRemoteSaiInterface.cpp +++ b/lib/src/RedisRemoteSaiInterface.cpp @@ -5,6 +5,7 @@ #include "VirtualObjectIdManager.h" #include "SkipRecordAttrContainer.h" #include "SwitchContainer.h" +#include "ZeroMQChannel.h" #include "sairediscommon.h" @@ -26,16 +27,12 @@ std::vector serialize_counter_id_list( _In_ const sai_stat_id_t *counter_id_list); RedisRemoteSaiInterface::RedisRemoteSaiInterface( - _In_ uint32_t globalContext, - _In_ std::shared_ptr scc, - _In_ const std::string& dbAsic, + _In_ std::shared_ptr contextConfig, _In_ std::function)> notificationCallback, _In_ std::shared_ptr recorder): - m_globalContext(globalContext), - m_switchConfigContainer(scc), + m_contextConfig(contextConfig), m_recorder(recorder), - m_notificationCallback(notificationCallback), - m_dbAsic(dbAsic) + m_notificationCallback(notificationCallback) { SWSS_LOG_ENTER(); @@ -73,13 +70,27 @@ sai_status_t RedisRemoteSaiInterface::initialize( m_useTempView = false; m_syncMode = false; - m_redisChannel = std::make_shared( - m_dbAsic, - std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3)); + if (m_contextConfig->m_zmqEnable) + { + m_communicationChannel = std::make_shared( + m_contextConfig->m_zmqEndpoint, + m_contextConfig->m_zmqNtfEndpoint, + std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3)); + + SWSS_LOG_NOTICE("zmq enabled, forcing sync mode"); + + m_syncMode = true; + } + else + { + m_communicationChannel = std::make_shared( + m_contextConfig->m_dbAsic, + std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3)); + } - auto db = m_redisChannel->getDbConnector(); + m_db = std::make_shared(m_contextConfig->m_dbAsic, 0); - m_redisVidIndexGenerator = std::make_shared(db, REDIS_KEY_VIDCOUNTER); + m_redisVidIndexGenerator = std::make_shared(m_db, REDIS_KEY_VIDCOUNTER); clear_local_state(); @@ -103,7 +114,7 @@ sai_status_t RedisRemoteSaiInterface::uninitialize(void) return SAI_STATUS_FAILURE; } - m_redisChannel = nullptr; // will stop thread + m_communicationChannel = nullptr; // will stop thread // clear local state after stopping threads @@ -344,11 +355,18 @@ sai_status_t RedisRemoteSaiInterface::setRedisExtensionAttribute( m_syncMode = attr->value.booldata; + if (m_contextConfig->m_zmqEnable) + { + SWSS_LOG_NOTICE("zmq enabled, forcing sync mode"); + + m_syncMode = true; + } + if (m_syncMode) { SWSS_LOG_NOTICE("disabling buffered pipeline in sync mode"); - m_redisChannel->setBuffered(false); + m_communicationChannel->setBuffered(false); } return SAI_STATUS_SUCCESS; @@ -362,13 +380,13 @@ sai_status_t RedisRemoteSaiInterface::setRedisExtensionAttribute( return SAI_STATUS_NOT_SUPPORTED; } - m_redisChannel->setBuffered(attr->value.booldata); + m_communicationChannel->setBuffered(attr->value.booldata); return SAI_STATUS_SUCCESS; case SAI_REDIS_SWITCH_ATTR_FLUSH: - m_redisChannel->flush(); + m_communicationChannel->flush(); return SAI_STATUS_SUCCESS; @@ -538,7 +556,7 @@ sai_status_t RedisRemoteSaiInterface::create( m_recorder->recordGenericCreate(key, entry); - m_redisChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_CREATE); + m_communicationChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_CREATE); auto status = waitForResponse(SAI_COMMON_API_CREATE); @@ -561,7 +579,7 @@ sai_status_t RedisRemoteSaiInterface::remove( m_recorder->recordGenericRemove(key); - m_redisChannel->del(key, REDIS_ASIC_STATE_COMMAND_REMOVE); + m_communicationChannel->del(key, REDIS_ASIC_STATE_COMMAND_REMOVE); auto status = waitForResponse(SAI_COMMON_API_REMOVE); @@ -591,7 +609,7 @@ sai_status_t RedisRemoteSaiInterface::set( m_recorder->recordGenericSet(key, entry); - m_redisChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_SET); + m_communicationChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_SET); auto status = waitForResponse(SAI_COMMON_API_SET); @@ -609,7 +627,7 @@ sai_status_t RedisRemoteSaiInterface::waitForResponse( { swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); return status; } @@ -631,7 +649,7 @@ sai_status_t RedisRemoteSaiInterface::waitForGetResponse( swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); auto &values = kfvFieldsValues(kco); @@ -685,7 +703,7 @@ sai_status_t RedisRemoteSaiInterface::get( // get is special, it will not put data // into asic view, only to message queue - m_redisChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_GET); + m_communicationChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_GET); auto status = waitForGetResponse(objectType, attr_count, attr_list); @@ -726,7 +744,7 @@ sai_status_t RedisRemoteSaiInterface::waitForFlushFdbEntriesResponse() swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_FLUSHRESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_FLUSHRESPONSE, kco); return status; } @@ -754,7 +772,7 @@ sai_status_t RedisRemoteSaiInterface::flushFdbEntries( m_recorder->recordFlushFdbEntries(switchId, attrCount, attrList); // TODO m_recorder->recordFlushFdbEntries(key, entry) - m_redisChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_FLUSH); + m_communicationChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_FLUSH); auto status = waitForFlushFdbEntriesResponse(); @@ -790,7 +808,7 @@ sai_status_t RedisRemoteSaiInterface::objectTypeGetAvailability( // This query will not put any data into the ASIC view, just into the // message queue - m_redisChannel->set(strSwitchId, entry, REDIS_ASIC_STATE_COMMAND_OBJECT_TYPE_GET_AVAILABILITY_QUERY); + m_communicationChannel->set(strSwitchId, entry, REDIS_ASIC_STATE_COMMAND_OBJECT_TYPE_GET_AVAILABILITY_QUERY); auto status = waitForObjectTypeGetAvailabilityResponse(count); @@ -806,7 +824,7 @@ sai_status_t RedisRemoteSaiInterface::waitForObjectTypeGetAvailabilityResponse( swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_OBJECT_TYPE_GET_AVAILABILITY_RESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_OBJECT_TYPE_GET_AVAILABILITY_RESPONSE, kco); if (status == SAI_STATUS_SUCCESS) { @@ -866,7 +884,7 @@ sai_status_t RedisRemoteSaiInterface::queryAttributeCapability( m_recorder->recordQueryAttributeCapability(switchId, objectType, attrId, capability); - m_redisChannel->set(switchIdStr, entry, REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_QUERY); + m_communicationChannel->set(switchIdStr, entry, REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_QUERY); auto status = waitForQueryAttributeCapabilityResponse(capability); @@ -882,7 +900,7 @@ sai_status_t RedisRemoteSaiInterface::waitForQueryAttributeCapabilityResponse( swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_RESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_RESPONSE, kco); if (status == SAI_STATUS_SUCCESS) { @@ -955,7 +973,7 @@ sai_status_t RedisRemoteSaiInterface::queryAattributeEnumValuesCapability( m_recorder->recordQueryAattributeEnumValuesCapability(switchId, objectType, attrId, enumValuesCapability); - m_redisChannel->set(switch_id_str, entry, REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_QUERY); + m_communicationChannel->set(switch_id_str, entry, REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_QUERY); auto status = waitForQueryAattributeEnumValuesCapabilityResponse(enumValuesCapability); @@ -971,7 +989,7 @@ sai_status_t RedisRemoteSaiInterface::waitForQueryAattributeEnumValuesCapability swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_RESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_RESPONSE, kco); if (status == SAI_STATUS_SUCCESS) { @@ -1045,7 +1063,7 @@ sai_status_t RedisRemoteSaiInterface::getStats( // get_stats will not put data to asic view, only to message queue - m_redisChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_GET_STATS); + m_communicationChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_GET_STATS); return waitForGetStatsResponse(number_of_counters, counters); } @@ -1058,7 +1076,7 @@ sai_status_t RedisRemoteSaiInterface::waitForGetStatsResponse( swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); if (status == SAI_STATUS_SUCCESS) { @@ -1117,7 +1135,7 @@ sai_status_t RedisRemoteSaiInterface::clearStats( m_recorder->recordGenericClearStats(object_type, object_id, number_of_counters, counter_ids); - m_redisChannel->set(key, values, REDIS_ASIC_STATE_COMMAND_CLEAR_STATS); + m_communicationChannel->set(key, values, REDIS_ASIC_STATE_COMMAND_CLEAR_STATS); auto status = waitForClearStatsResponse(); @@ -1132,7 +1150,7 @@ sai_status_t RedisRemoteSaiInterface::waitForClearStatsResponse() swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); return status; } @@ -1174,7 +1192,7 @@ sai_status_t RedisRemoteSaiInterface::bulkRemove( m_recorder->recordBulkGenericRemove(serializedObjectType, entries); - m_redisChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_REMOVE); + m_communicationChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_REMOVE); return waitForBulkResponse(SAI_COMMON_API_BULK_REMOVE, (uint32_t)serialized_object_ids.size(), object_statuses); } @@ -1190,7 +1208,7 @@ sai_status_t RedisRemoteSaiInterface::waitForBulkResponse( { swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco); auto &values = kfvFieldsValues(kco); @@ -1407,7 +1425,7 @@ sai_status_t RedisRemoteSaiInterface::bulkSet( m_recorder->recordBulkGenericSet(serializedObjectType, entries); - m_redisChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_SET); + m_communicationChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_SET); return waitForBulkResponse(SAI_COMMON_API_BULK_SET, (uint32_t)serialized_object_ids.size(), object_statuses); } @@ -1506,7 +1524,7 @@ sai_status_t RedisRemoteSaiInterface::bulkCreate( m_recorder->recordBulkGenericCreate(str_object_type, entries); - m_redisChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_CREATE); + m_communicationChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_CREATE); return waitForBulkResponse(SAI_COMMON_API_BULK_CREATE, (uint32_t)serialized_object_ids.size(), object_statuses); } @@ -1627,7 +1645,7 @@ sai_status_t RedisRemoteSaiInterface::notifySyncd( m_recorder->recordNotifySyncd(switchId, redisNotifySyncd); - m_redisChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_NOTIFY); + m_communicationChannel->set(key, entry, REDIS_ASIC_STATE_COMMAND_NOTIFY); auto status = waitForNotifySyncdResponse(); @@ -1642,7 +1660,7 @@ sai_status_t RedisRemoteSaiInterface::waitForNotifySyncdResponse() swss::KeyOpFieldsValuesTuple kco; - auto status = m_redisChannel->wait(REDIS_ASIC_STATE_COMMAND_NOTIFY, kco); + auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_NOTIFY, kco); return status; } @@ -1798,8 +1816,8 @@ void RedisRemoteSaiInterface::clear_local_state() m_virtualObjectIdManager = std::make_shared( - m_globalContext, - m_switchConfigContainer, + m_contextConfig->m_guid, + m_contextConfig->m_scc, m_redisVidIndexGenerator); auto meta = m_meta.lock(); @@ -1868,9 +1886,7 @@ void RedisRemoteSaiInterface::refreshTableDump() SWSS_LOG_TIMER("get asic view from %s", ASIC_STATE_TABLE); - auto db = m_redisChannel->getDbConnector(); - - swss::Table table(db.get(), ASIC_STATE_TABLE); + swss::Table table(m_db.get(), ASIC_STATE_TABLE); swss::TableDump dump; diff --git a/lib/src/ZeroMQChannel.cpp b/lib/src/ZeroMQChannel.cpp new file mode 100644 index 000000000000..85b4ba460bae --- /dev/null +++ b/lib/src/ZeroMQChannel.cpp @@ -0,0 +1,324 @@ +#include "ZeroMQChannel.h" + +#include "sairediscommon.h" + +#include "meta/sai_serialize.h" + +#include "swss/logger.h" +#include "swss/select.h" + +#include + +using namespace sairedis; + +/** + * @brief Get response timeout in milliseconds. + */ +#define ZMQ_GETRESPONSE_TIMEOUT_MS (60*1000) + +#define ZMQ_RESPONSE_BUFFER_SIZE (4*1024*1024) + +ZeroMQChannel::ZeroMQChannel( + _In_ const std::string& endpoint, + _In_ const std::string& ntfEndpoint, + _In_ Channel::Callback callback): + Channel(callback), + m_endpoint(endpoint), + m_ntfEndpoint(ntfEndpoint), + m_context(nullptr), + m_socket(nullptr), + m_ntfContext(nullptr), + m_ntfSocket(nullptr) +{ + SWSS_LOG_ENTER(); + + m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); + + // configure ZMQ for main communication + + m_context = zmq_ctx_new(); + + m_socket = zmq_socket(m_context, ZMQ_REQ); + + SWSS_LOG_NOTICE("opening zmq main endpoint: %s", endpoint.c_str()); + + int rc = zmq_connect(m_socket, endpoint.c_str()); + + if (rc != 0) + { + SWSS_LOG_THROW("failed to open zmq main endpoint %s, zmqerrno: %d", + endpoint.c_str(), + zmq_errno()); + } + + // configure ZMQ notification endpoint + + m_ntfContext = zmq_ctx_new(); + + m_ntfSocket = zmq_socket(m_ntfContext, ZMQ_SUB); + + SWSS_LOG_NOTICE("opening zmq ntf endpoint: %s", ntfEndpoint.c_str()); + + rc = zmq_connect(m_ntfSocket, ntfEndpoint.c_str()); + + if (rc != 0) + { + SWSS_LOG_THROW("failed to open zmq ntf endpoint %s, zmqerrno: %d", + ntfEndpoint.c_str(), + zmq_errno()); + } + + rc = zmq_setsockopt(m_ntfSocket, ZMQ_SUBSCRIBE, "", 0); + + if (rc != 0) + { + SWSS_LOG_THROW("failed to set sock opt ZMQ_SUBSCRIBE on ntf endpoint %s, zmqerrno: %d", + ntfEndpoint.c_str(), + zmq_errno()); + } + + // start thread + + m_runNotificationThread = true; + + SWSS_LOG_NOTICE("creating notification thread"); + + m_notificationThread = std::make_shared(&ZeroMQChannel::notificationThreadFunction, this); +} + +ZeroMQChannel::~ZeroMQChannel() +{ + SWSS_LOG_ENTER(); + + m_runNotificationThread = false; + + zmq_close(m_socket); + zmq_ctx_destroy(m_context); + + // when zmq context is destroyed, zmq_recv will be interrupted and errno + // will be set to ETERM, so we don't need actual FD to be used in + // selectable event + + zmq_close(m_ntfSocket); + zmq_ctx_destroy(m_ntfContext); + + SWSS_LOG_NOTICE("join ntf thread begin"); + + m_notificationThread->join(); + + SWSS_LOG_NOTICE("join ntf thread end"); +} + +void ZeroMQChannel::notificationThreadFunction() +{ + SWSS_LOG_ENTER(); + + SWSS_LOG_NOTICE("start listening for notifications"); + + std::vector buffer; + + buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); + + while (m_runNotificationThread) + { + // NOTE: this entire loop internal could be encapsulated into separate class + // which will inherit from Selectable class, and name this as ntf receiver + + int rc = zmq_recv(m_ntfSocket, buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); + + if (rc <= 0 && zmq_errno() == ETERM) + { + SWSS_LOG_NOTICE("zmq_recv interrupted with ETERM, ending thread"); + break; + } + + if (rc < 0) + { + SWSS_LOG_ERROR("zmq_recv failed, zmqerrno: %d", zmq_errno()); + + // at this point we don't know if next zmq_recv will succeed + + continue; + } + + if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + { + SWSS_LOG_WARN("zmq_recv message was turncated (over %d bytes, received %d), increase buffer size, message DROPPED", + ZMQ_RESPONSE_BUFFER_SIZE, + rc); + + continue; + } + + buffer.at(rc) = 0; // make sure that we end string with zero before parse + + SWSS_LOG_DEBUG("ntf: %s", buffer.data()); + + std::vector values; + + swss::JSon::readJson((char*)buffer.data(), values); + + swss::FieldValueTuple fvt = values.at(0); + + const std::string& op = fvField(fvt); + const std::string& data = fvValue(fvt); + + values.erase(values.begin()); + + SWSS_LOG_DEBUG("notification: op = %s, data = %s", op.c_str(), data.c_str()); + + m_callback(op, data, values); + } + + SWSS_LOG_NOTICE("exiting notification thread"); +} + +void ZeroMQChannel::setBuffered( + _In_ bool buffered) +{ + SWSS_LOG_ENTER(); + + // not supported +} + +void ZeroMQChannel::flush() +{ + SWSS_LOG_ENTER(); + + // not supported +} + +void ZeroMQChannel::set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& command) +{ + SWSS_LOG_ENTER(); + + std::vector copy = values; + + swss::FieldValueTuple opdata(key, command); + + copy.insert(copy.begin(), opdata); + + std::string msg = swss::JSon::buildJson(copy); + + SWSS_LOG_DEBUG("sending: %s", msg.c_str()); + + int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); + + if (rc <= 0) + { + SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d", + m_endpoint.c_str(), + zmq_errno()); + } +} + +void ZeroMQChannel::del( + _In_ const std::string& key, + _In_ const std::string& command) +{ + SWSS_LOG_ENTER(); + + std::vector values; + + swss::FieldValueTuple opdata(key, command); + + values.insert(values.begin(), opdata); + + std::string msg = swss::JSon::buildJson(values); + + SWSS_LOG_DEBUG("sending: %s", msg.c_str()); + + int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); + + if (rc <= 0) + { + SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d", + m_endpoint.c_str(), + zmq_errno()); + } +} + +sai_status_t ZeroMQChannel::wait( + _In_ const std::string& command, + _Out_ swss::KeyOpFieldsValuesTuple& kco) +{ + SWSS_LOG_ENTER(); + + SWSS_LOG_INFO("wait for %s response", command.c_str()); + + zmq_pollitem_t items [1] = { }; + + items[0].socket = m_socket; + items[0].events = ZMQ_POLLIN; + + int rc = zmq_poll(items, 1, ZMQ_GETRESPONSE_TIMEOUT_MS); + + if (rc == 0) + { + SWSS_LOG_ERROR("zmq_poll timed out for: %s", command.c_str()); + + // notice, at this point we could throw, since in REP/REQ pattern + // we are forced to use send/recv in that specific order + + return SAI_STATUS_FAILURE; + } + + if (rc < 0) + { + SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno()); + } + + rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); + + if (rc < 0) + { + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + } + + if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + { + SWSS_LOG_THROW("zmq_recv message was turncated (over %d bytes, recived %d), increase buffer size, message DROPPED", + ZMQ_RESPONSE_BUFFER_SIZE, + rc); + } + + m_buffer.at(rc) = 0; // make sure that we end string with zero before parse + + SWSS_LOG_DEBUG("response: %s", m_buffer.data()); + + std::vector values; + + swss::JSon::readJson((char*)m_buffer.data(), values); + + swss::FieldValueTuple fvt = values.at(0); + + const std::string& opkey = fvField(fvt); + const std::string& op= fvValue(fvt); + + values.erase(values.begin()); + + kfvFieldsValues(kco) = values; + kfvOp(kco) = op; + kfvKey(kco) = opkey; + + SWSS_LOG_INFO("response: op = %s, key = %s", opkey.c_str(), op.c_str()); + + if (op != command) + { + // we can hit this place if there were some timeouts + // as well, if there will be multiple "GET" messages, then + // we can receive response from not the expected GET + + SWSS_LOG_THROW("got not expected response: %s:%s, expected: %s", opkey.c_str(), op.c_str(), command.c_str()); + } + + sai_status_t status; + sai_deserialize_status(opkey, status); + + SWSS_LOG_DEBUG("%s status: %s", command.c_str(), opkey.c_str()); + + return status; +} diff --git a/lib/src/context_config.json b/lib/src/context_config.json index 21a013dba123..71116399ed77 100644 --- a/lib/src/context_config.json +++ b/lib/src/context_config.json @@ -7,6 +7,9 @@ "dbCounters" : "COUNTERS_DB", "dbFlex": "FLEX_COUNTER_DB", "dbState" : "STATE_DB", + "zmq_enable": false, + "zmq_endpoint": "tcp://127.0.0.1:5555", + "zmq_ntf_endpoint": "tcp://127.0.0.1:5556", "switches": [ { "index" : 0, @@ -25,6 +28,9 @@ "dbCounters" : "GB_COUNTERS_DB", "dbFlex": "GB_FLEX_COUNTER_DB", "dbState" : "STATE_DB", + "zmq_enable":false, + "zmq_endpoint": "tcp://127.0.0.1:5565", + "zmq_ntf_endpoint": "tcp://127.0.0.1:5566", "switches": [ { "index" : 0, diff --git a/saidump/Makefile.am b/saidump/Makefile.am index 63d87f773a04..b34c944561ab 100644 --- a/saidump/Makefile.am +++ b/saidump/Makefile.am @@ -10,4 +10,4 @@ endif saidump_SOURCES = saidump.cpp saidump_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) -saidump_LDADD = -lhiredis -lswsscommon -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -L$(top_srcdir)/lib/src/.libs -lsairedis +saidump_LDADD = -lhiredis -lswsscommon -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -L$(top_srcdir)/lib/src/.libs -lsairedis -lzmq diff --git a/saiplayer/Makefile.am b/saiplayer/Makefile.am index 350ed851b2c9..3670193afe7f 100644 --- a/saiplayer/Makefile.am +++ b/saiplayer/Makefile.am @@ -19,4 +19,4 @@ libSaiPlayer_a_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) -std=c++14 saiplayer_SOURCES = saiplayer.cpp saiplayer_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) -std=c++14 -saiplayer_LDADD = libSaiPlayer.a ../syncd/libSyncd.a ../lib/src/libSaiRedis.a -lhiredis -lswsscommon -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta +saiplayer_LDADD = libSaiPlayer.a ../syncd/libSyncd.a ../lib/src/libSaiRedis.a -lhiredis -lswsscommon -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -lzmq diff --git a/syncd/Makefile.am b/syncd/Makefile.am index 12d9740edfbd..59dd5f21a137 100644 --- a/syncd/Makefile.am +++ b/syncd/Makefile.am @@ -16,6 +16,11 @@ endif noinst_LIBRARIES = libSyncd.a libSyncdRequestShutdown.a libSyncd_a_SOURCES = \ + ZeroMQSelectableChannel.cpp \ + RedisSelectableChannel.cpp \ + SelectableChannel.cpp \ + ZeroMQNotificationProducer.cpp \ + RedisNotificationProducer.cpp \ Syncd.cpp \ BreakConfig.cpp \ BreakConfigParser.cpp \ @@ -57,7 +62,7 @@ libSyncd_a_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) $(SAIFLAGS) -s syncd_SOURCES = main.cpp syncd_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) $(SAIFLAGS) -syncd_LDADD = libSyncd.a ../lib/src/libSaiRedis.a -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -ldl -lhiredis -lswsscommon $(SAILIB) -lpthread +syncd_LDADD = libSyncd.a ../lib/src/libSaiRedis.a -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -ldl -lhiredis -lswsscommon $(SAILIB) -lpthread -lzmq if SAITHRIFT libSyncd_a_CPPFLAGS += -DSAITHRIFT=yes @@ -84,5 +89,5 @@ syncd_request_shutdown_LDADD = libSyncdRequestShutdown.a ../lib/src/libSaiRedis. tests_SOURCES = tests.cpp tests_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) -tests_LDADD = libSyncd.a -lhiredis -lswsscommon -lpthread -L$(top_srcdir)/lib/src/.libs -lsairedis -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta +tests_LDADD = libSyncd.a -lhiredis -lswsscommon -lpthread -L$(top_srcdir)/lib/src/.libs -lsairedis -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -lzmq TESTS = tests diff --git a/syncd/NotificationProcessor.cpp b/syncd/NotificationProcessor.cpp index 9e80b2519570..42304b7eb030 100644 --- a/syncd/NotificationProcessor.cpp +++ b/syncd/NotificationProcessor.cpp @@ -15,7 +15,7 @@ using namespace syncd; using namespace saimeta; NotificationProcessor::NotificationProcessor( - _In_ std::shared_ptr producer, + _In_ std::shared_ptr producer, _In_ std::shared_ptr client, _In_ std::function synchronizer): m_synchronizer(synchronizer), diff --git a/syncd/NotificationProcessor.h b/syncd/NotificationProcessor.h index a92065855053..13f2eda01cfc 100644 --- a/syncd/NotificationProcessor.h +++ b/syncd/NotificationProcessor.h @@ -3,6 +3,7 @@ #include "NotificationQueue.h" #include "VirtualOidTranslator.h" #include "RedisClient.h" +#include "NotificationProducerBase.h" #include "swss/notificationproducer.h" @@ -18,7 +19,7 @@ namespace syncd public: NotificationProcessor( - _In_ std::shared_ptr producer, + _In_ std::shared_ptr producer, _In_ std::shared_ptr client, _In_ std::function synchronizer); @@ -131,6 +132,6 @@ namespace syncd std::shared_ptr m_client; - std::shared_ptr m_notifications; + std::shared_ptr m_notifications; }; } diff --git a/syncd/NotificationProducerBase.h b/syncd/NotificationProducerBase.h new file mode 100644 index 000000000000..a1a643c443b1 --- /dev/null +++ b/syncd/NotificationProducerBase.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include "swss/table.h" +#include "swss/sal.h" + +namespace syncd +{ + class NotificationProducerBase + { + public: + + NotificationProducerBase() = default; + + virtual ~NotificationProducerBase() = default; + + public: + + virtual void send( + _In_ const std::string& op, + _In_ const std::string& data, + _In_ const std::vector& values) = 0; + }; +} diff --git a/syncd/RedisNotificationProducer.cpp b/syncd/RedisNotificationProducer.cpp new file mode 100644 index 000000000000..4a4ca709d3bd --- /dev/null +++ b/syncd/RedisNotificationProducer.cpp @@ -0,0 +1,29 @@ +#include "RedisNotificationProducer.h" + +#include "sairediscommon.h" + +#include "swss/logger.h" + +using namespace syncd; + +RedisNotificationProducer::RedisNotificationProducer( + _In_ const std::string& dbName) +{ + SWSS_LOG_ENTER(); + + m_db = std::make_shared(dbName, 0); + + m_notificationProducer = std::make_shared(m_db.get(), REDIS_TABLE_NOTIFICATIONS); +} + +void RedisNotificationProducer::send( + _In_ const std::string& op, + _In_ const std::string& data, + _In_ const std::vector& values) +{ + SWSS_LOG_ENTER(); + + std::vector vals = values; + + m_notificationProducer->send(op, data, vals); +} diff --git a/syncd/RedisNotificationProducer.h b/syncd/RedisNotificationProducer.h new file mode 100644 index 000000000000..6165bfc1417e --- /dev/null +++ b/syncd/RedisNotificationProducer.h @@ -0,0 +1,33 @@ +#pragma once + +#include "NotificationProducerBase.h" + +#include "swss/dbconnector.h" +#include "swss/notificationproducer.h" + +namespace syncd +{ + class RedisNotificationProducer: + public NotificationProducerBase + { + public: + + RedisNotificationProducer( + _In_ const std::string& dbName); + + virtual ~RedisNotificationProducer() = default; + + public: + + virtual void send( + _In_ const std::string& op, + _In_ const std::string& data, + _In_ const std::vector& values) override; + + private: + + std::shared_ptr m_db; + + std::shared_ptr m_notificationProducer; + }; +} diff --git a/syncd/RedisSelectableChannel.cpp b/syncd/RedisSelectableChannel.cpp new file mode 100644 index 000000000000..d56db32795ba --- /dev/null +++ b/syncd/RedisSelectableChannel.cpp @@ -0,0 +1,112 @@ +#include "RedisSelectableChannel.h" + +#include "swss/logger.h" + +using namespace syncd; + +RedisSelectableChannel::RedisSelectableChannel( + _In_ std::shared_ptr dbAsic, + _In_ const std::string& asicStateTable, + _In_ const std::string& getResponseTable, + _In_ const std::string& tempPrefix, + _In_ bool modifyRedis): + m_dbAsic(dbAsic), + m_tempPrefix(tempPrefix), + m_modifyRedis(modifyRedis) +{ + SWSS_LOG_ENTER(); + + m_asicState = std::make_shared(m_dbAsic.get(), asicStateTable); + + /* + * At the end we cant use producer consumer concept since if one process + * will restart there may be something in the queue also "remove" from + * response queue will also trigger another "response". + */ + + m_getResponse = std::make_shared(m_dbAsic.get(), getResponseTable); +} + +bool RedisSelectableChannel::empty() +{ + SWSS_LOG_ENTER(); + + return m_asicState->empty(); +} + +void RedisSelectableChannel::set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& op) +{ + SWSS_LOG_ENTER(); + + m_getResponse->set(key, values, op); +} + +void RedisSelectableChannel::pop( + _Out_ swss::KeyOpFieldsValuesTuple& kco, + _In_ bool initViewMode) +{ + SWSS_LOG_ENTER(); + + if (initViewMode) + { + m_asicState->pop(kco, m_tempPrefix); + } + else + { + m_asicState->pop(kco); + } +} + +// Selectable overrides + +int RedisSelectableChannel::getFd() +{ + SWSS_LOG_ENTER(); + + return m_asicState->getFd(); +} + +uint64_t RedisSelectableChannel::readData() +{ + SWSS_LOG_ENTER(); + + return m_asicState->readData(); +} + +bool RedisSelectableChannel::hasData() +{ + SWSS_LOG_ENTER(); + + return m_asicState->hasData(); +} + +bool RedisSelectableChannel::hasCachedData() +{ + SWSS_LOG_ENTER(); + + return m_asicState->hasCachedData(); +} + +bool RedisSelectableChannel::initializedWithData() +{ + SWSS_LOG_ENTER(); + + return m_asicState->initializedWithData(); +} + +void RedisSelectableChannel::updateAfterRead() +{ + SWSS_LOG_ENTER(); + + return m_asicState->updateAfterRead(); +} + +int RedisSelectableChannel::getPri() const +{ + SWSS_LOG_ENTER(); + + return m_asicState->getPri(); +} diff --git a/syncd/RedisSelectableChannel.h b/syncd/RedisSelectableChannel.h new file mode 100644 index 000000000000..66a9215bf32c --- /dev/null +++ b/syncd/RedisSelectableChannel.h @@ -0,0 +1,65 @@ +#pragma once + +#include "SelectableChannel.h" + +#include "swss/consumertable.h" +#include "swss/producertable.h" + +namespace syncd +{ + class RedisSelectableChannel: + public SelectableChannel + { + public: + + RedisSelectableChannel( + _In_ std::shared_ptr dbAsic, + _In_ const std::string& asicStateTable, + _In_ const std::string& getResponseTable, + _In_ const std::string& tempPrefix, + _In_ bool modifyRedis); + + virtual ~RedisSelectableChannel() = default; + + public: // SelectableChannel overrides + + virtual bool empty() override; + + virtual void pop( + _Out_ swss::KeyOpFieldsValuesTuple& kco, + _In_ bool initViewMode) override; + + virtual void set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& op) override; + + public: // Selectable overrides + + virtual int getFd() override; + + virtual uint64_t readData() override; + + virtual bool hasData() override; + + virtual bool hasCachedData() override; + + virtual bool initializedWithData() override; + + virtual void updateAfterRead() override; + + virtual int getPri() const override; + + private: + + std::shared_ptr m_dbAsic; + + std::shared_ptr m_asicState; + + std::shared_ptr m_getResponse; + + std::string m_tempPrefix; + + bool m_modifyRedis; + }; +} diff --git a/syncd/SaiSwitch.cpp b/syncd/SaiSwitch.cpp index 8e0e48e76d97..d26a74944582 100644 --- a/syncd/SaiSwitch.cpp +++ b/syncd/SaiSwitch.cpp @@ -617,7 +617,7 @@ bool SaiSwitch::isNonRemovableRid( */ /* Here we are checking for isSwitchObjectDefaultRid first then ColdBootDiscoveredRid - * as it is possible we can discover switch Internal OID as part of warm-booot also especially + * as it is possible we can discover switch Internal OID as part of warm-boot also especially * when we are doing SAI upgrade as part of warm-boot.*/ if (isSwitchObjectDefaultRid(rid)) diff --git a/syncd/SelectableChannel.cpp b/syncd/SelectableChannel.cpp new file mode 100644 index 000000000000..42206918ba4a --- /dev/null +++ b/syncd/SelectableChannel.cpp @@ -0,0 +1,14 @@ +#include "SelectableChannel.h" + +#include "swss/logger.h" + +using namespace syncd; + +SelectableChannel::SelectableChannel( + _In_ int pri): + Selectable(pri) +{ + SWSS_LOG_ENTER(); + + // empty +} diff --git a/syncd/SelectableChannel.h b/syncd/SelectableChannel.h new file mode 100644 index 000000000000..3f25746a0fb1 --- /dev/null +++ b/syncd/SelectableChannel.h @@ -0,0 +1,35 @@ +#pragma once + +#include "swss/selectable.h" +#include "swss/table.h" +#include "swss/sal.h" + +#include +#include + +namespace syncd +{ + class SelectableChannel: + public swss::Selectable + { + public: + + SelectableChannel( + _In_ int pri = 0); + + virtual ~SelectableChannel() = default; + + public: + + virtual bool empty() = 0; + + virtual void pop( + _Out_ swss::KeyOpFieldsValuesTuple& kco, + _In_ bool initViewMode) = 0; + + virtual void set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& op) = 0; + }; +} diff --git a/syncd/Syncd.cpp b/syncd/Syncd.cpp index 9d40bf762a7e..0f6ccc74674d 100644 --- a/syncd/Syncd.cpp +++ b/syncd/Syncd.cpp @@ -9,6 +9,10 @@ #include "WarmRestartTable.h" #include "ContextConfigContainer.h" #include "BreakConfigParser.h" +#include "RedisNotificationProducer.h" +#include "ZeroMQNotificationProducer.h" +#include "RedisSelectableChannel.h" +#include "ZeroMQSelectableChannel.h" #include "sairediscommon.h" @@ -67,9 +71,29 @@ Syncd::Syncd( m_dbAsic = std::make_shared(m_contextConfig->m_dbAsic, 0); - m_dbNtf = std::make_shared(m_contextConfig->m_dbAsic, 0); + if (m_contextConfig->m_zmqEnable) + { + m_notifications = std::make_shared(m_contextConfig->m_zmqNtfEndpoint); + + SWSS_LOG_NOTICE("zmq enabled, forcing sync mode"); + + m_commandLineOptions->m_enableSyncMode = true; + + m_selectableChannel = std::make_shared(m_contextConfig->m_zmqEndpoint); + } + else + { + m_notifications = std::make_shared(m_contextConfig->m_dbAsic); + + bool modifyRedis = m_commandLineOptions->m_enableSyncMode ? false : true; - m_notifications = std::make_shared(m_dbNtf.get(), REDIS_TABLE_NOTIFICATIONS); + m_selectableChannel = std::make_shared( + m_dbAsic, + ASIC_STATE_TABLE, + REDIS_TABLE_GETRESPONSE, + TEMP_PREFIX, + modifyRedis); + } m_client = std::make_shared(m_dbAsic); @@ -84,11 +108,8 @@ Syncd::Syncd( m_handler->setSwitchNotifications(m_sn.getSwitchNotifications()); - m_asicState = std::make_shared(m_dbAsic.get(), ASIC_STATE_TABLE); m_restartQuery = std::make_shared(m_dbAsic.get(), SYNCD_NOTIFICATION_CHANNEL_RESTARTQUERY); - m_asicState->setModifyRedis(m_commandLineOptions->m_enableSyncMode ? false : true); - // TODO to be moved to ASIC_DB m_dbFlexCounter = std::make_shared(m_contextConfig->m_dbFlex, 0); m_flexCounter = std::make_shared(m_dbFlexCounter.get(), FLEX_COUNTER_TABLE); @@ -108,14 +129,6 @@ Syncd::Syncd( m_processor->m_translator = m_translator; // TODO as param - /* - * At the end we cant use producer consumer concept since if one process - * will restart there may be something in the queue also "remove" from - * response queue will also trigger another "response". - */ - - m_getResponse = std::make_shared(m_dbAsic.get(), REDIS_TABLE_GETRESPONSE); - m_veryFirstRun = isVeryFirstRun(); performStartupLogic(); @@ -224,7 +237,7 @@ bool Syncd::isInitViewMode() const } void Syncd::processEvent( - _In_ swss::ConsumerTable &consumer) + _In_ SelectableChannel& consumer) { SWSS_LOG_ENTER(); @@ -234,20 +247,13 @@ void Syncd::processEvent( { swss::KeyOpFieldsValuesTuple kco; - if (isInitViewMode()) - { - /* - * In init mode we put all data to TEMP view and we snoop. We need - * to specify temporary view prefix in consumer since consumer puts - * data to redis db. - */ + /* + * In init mode we put all data to TEMP view and we snoop. We need + * to specify temporary view prefix in consumer since consumer puts + * data to redis db. + */ - consumer.pop(kco, TEMP_PREFIX); - } - else - { - consumer.pop(kco); - } + consumer.pop(kco, isInitViewMode()); processSingleEvent(kco); } @@ -334,7 +340,7 @@ sai_status_t Syncd::processAttrCapabilityQuery( { SWSS_LOG_ERROR("Invalid input: expected 2 arguments, received %zu", values.size()); - m_getResponse->set(sai_serialize_status(SAI_STATUS_INVALID_PARAMETER), {}, REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_RESPONSE); + m_selectableChannel->set(sai_serialize_status(SAI_STATUS_INVALID_PARAMETER), {}, REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_RESPONSE); return SAI_STATUS_INVALID_PARAMETER; } @@ -364,7 +370,7 @@ sai_status_t Syncd::processAttrCapabilityQuery( capability.create_implemented, capability.set_implemented, capability.get_implemented); } - m_getResponse->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_RESPONSE); + m_selectableChannel->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_ATTR_CAPABILITY_RESPONSE); return status; } @@ -387,7 +393,7 @@ sai_status_t Syncd::processAttrEnumValuesCapabilityQuery( { SWSS_LOG_ERROR("Invalid input: expected 3 arguments, received %zu", values.size()); - m_getResponse->set(sai_serialize_status(SAI_STATUS_INVALID_PARAMETER), {}, REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_RESPONSE); + m_selectableChannel->set(sai_serialize_status(SAI_STATUS_INVALID_PARAMETER), {}, REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_RESPONSE); return SAI_STATUS_INVALID_PARAMETER; } @@ -431,7 +437,7 @@ sai_status_t Syncd::processAttrEnumValuesCapabilityQuery( SWSS_LOG_DEBUG("Sending response: capabilities = '%s', count = %d", strCap.c_str(), enumCapList.count); } - m_getResponse->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_RESPONSE); + m_selectableChannel->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_ATTR_ENUM_VALUES_CAPABILITY_RESPONSE); return status; } @@ -484,7 +490,7 @@ sai_status_t Syncd::processObjectTypeGetAvailabilityQuery( SWSS_LOG_DEBUG("Sending response: count = %lu", count); } - m_getResponse->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_OBJECT_TYPE_GET_AVAILABILITY_RESPONSE); + m_selectableChannel->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_OBJECT_TYPE_GET_AVAILABILITY_RESPONSE); return status; } @@ -523,7 +529,7 @@ sai_status_t Syncd::processFdbFlush( sai_status_t status = m_vendorSai->flushFdbEntries(switchRid, attr_count, attr_list); - m_getResponse->set(sai_serialize_status(status), {} , REDIS_ASIC_STATE_COMMAND_FLUSHRESPONSE); + m_selectableChannel->set(sai_serialize_status(status), {} , REDIS_ASIC_STATE_COMMAND_FLUSHRESPONSE); return status; } @@ -542,7 +548,7 @@ sai_status_t Syncd::processClearStatsEvent( { SWSS_LOG_WARN("VID to RID translation failure: %s", key.c_str()); sai_status_t status = SAI_STATUS_INVALID_OBJECT_ID; - m_getResponse->set(sai_serialize_status(status), {}, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); + m_selectableChannel->set(sai_serialize_status(status), {}, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); return status; } @@ -569,7 +575,7 @@ sai_status_t Syncd::processClearStatsEvent( (uint32_t)counter_ids.size(), counter_ids.data()); - m_getResponse->set(sai_serialize_status(status), {}, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); + m_selectableChannel->set(sai_serialize_status(status), {}, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); return status; } @@ -630,7 +636,7 @@ sai_status_t Syncd::processGetStatsEvent( } } - m_getResponse->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); + m_selectableChannel->set(sai_serialize_status(status), entry, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); return status; } @@ -1273,7 +1279,7 @@ void Syncd::sendApiResponse( sai_serialize_common_api(api).c_str(), strStatus.c_str()); - m_getResponse->set(strStatus, entry, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); + m_selectableChannel->set(strStatus, entry, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); SWSS_LOG_INFO("response for %s api was send", sai_serialize_common_api(api).c_str()); @@ -2111,7 +2117,7 @@ void Syncd::sendGetResponse( * response will not put any data to table, only queue is used. */ - m_getResponse->set(strStatus, entry, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); + m_selectableChannel->set(strStatus, entry, REDIS_ASIC_STATE_COMMAND_GETRESPONSE); SWSS_LOG_INFO("response for GET api was send"); } @@ -2594,7 +2600,7 @@ void Syncd::sendNotifyResponse( SWSS_LOG_INFO("sending response: %s", strStatus.c_str()); - m_getResponse->set(strStatus, entry, REDIS_ASIC_STATE_COMMAND_NOTIFY); + m_selectableChannel->set(strStatus, entry, REDIS_ASIC_STATE_COMMAND_NOTIFY); } void Syncd::clearTempView() @@ -3637,7 +3643,7 @@ void Syncd::run() SWSS_LOG_NOTICE("syncd listening for events"); - s->addSelectable(m_asicState.get()); + s->addSelectable(m_selectableChannel.get()); s->addSelectable(m_restartQuery.get()); s->addSelectable(m_flexCounter.get()); s->addSelectable(m_flexCounterGroup.get()); @@ -3678,11 +3684,11 @@ void Syncd::run() * lead to unable to find some objects. */ - SWSS_LOG_NOTICE("is asic queue empty: %d", m_asicState->empty()); + SWSS_LOG_NOTICE("is asic queue empty: %d", m_selectableChannel->empty()); - while (!m_asicState->empty()) + while (!m_selectableChannel->empty()) { - processEvent(*m_asicState.get()); + processEvent(*m_selectableChannel.get()); } SWSS_LOG_NOTICE("drained queue"); @@ -3748,9 +3754,9 @@ void Syncd::run() { processFlexCounterGroupEvent(*(swss::ConsumerTable*)sel); } - else if (sel == m_asicState.get()) + else if (sel == m_selectableChannel.get()) { - processEvent(*(swss::ConsumerTable*)sel); + processEvent(*m_selectableChannel.get()); } else { diff --git a/syncd/Syncd.h b/syncd/Syncd.h index d94f90a3f581..6aea2e527481 100644 --- a/syncd/Syncd.h +++ b/syncd/Syncd.h @@ -15,6 +15,8 @@ #include "RequestShutdown.h" #include "ContextConfig.h" #include "BreakConfig.h" +#include "NotificationProducerBase.h" +#include "SelectableChannel.h" #include "meta/SaiAttributeList.h" @@ -61,7 +63,7 @@ namespace syncd public: // TODO private void processEvent( - _In_ swss::ConsumerTable &consumer); + _In_ SelectableChannel& consumer); sai_status_t processQuadEventInInitViewMode( _In_ sai_object_type_t objectType, @@ -356,8 +358,6 @@ namespace syncd std::shared_ptr m_manager; - std::shared_ptr m_getResponse; - /** * @brief set of objects removed by user when we are in init view * mode. Those could be vlan members, bridge ports etc. @@ -408,6 +408,8 @@ namespace syncd std::shared_ptr m_processor; + std::shared_ptr m_selectableChannel; + private: /** @@ -438,17 +440,13 @@ namespace syncd std::shared_ptr m_dbAsic; - std::shared_ptr m_dbNtf; - - std::shared_ptr m_asicState; - std::shared_ptr m_restartQuery; std::shared_ptr m_dbFlexCounter; std::shared_ptr m_flexCounter; std::shared_ptr m_flexCounterGroup; - std::shared_ptr m_notifications; + std::shared_ptr m_notifications; std::shared_ptr m_switchConfigContainer; std::shared_ptr m_redisVidIndexGenerator; diff --git a/syncd/ZeroMQNotificationProducer.cpp b/syncd/ZeroMQNotificationProducer.cpp new file mode 100644 index 000000000000..54faddbd8845 --- /dev/null +++ b/syncd/ZeroMQNotificationProducer.cpp @@ -0,0 +1,61 @@ +#include "ZeroMQNotificationProducer.h" + +#include + +using namespace syncd; + +ZeroMQNotificationProducer::ZeroMQNotificationProducer( + _In_ const std::string& ntfEndpoint): + m_ntfContext(nullptr), + m_ntfSocket(nullptr) +{ + SWSS_LOG_ENTER(); + + m_ntfContext = zmq_ctx_new(); + + m_ntfSocket = zmq_socket(m_ntfContext, ZMQ_PUB); + + SWSS_LOG_NOTICE("opening zmq ntf endpoint: %s", ntfEndpoint.c_str()); + + int rc = zmq_connect(m_ntfSocket, ntfEndpoint.c_str()); + + if (rc != 0) + { + SWSS_LOG_THROW("failed to open zmq ntf endpoint %s, zmqerrno: %d", + ntfEndpoint.c_str(), + zmq_errno()); + } +} + +ZeroMQNotificationProducer::~ZeroMQNotificationProducer() +{ + SWSS_LOG_ENTER(); + + zmq_close(m_ntfSocket); + zmq_ctx_destroy(m_ntfContext); +} + +void ZeroMQNotificationProducer::send( + _In_ const std::string& op, + _In_ const std::string& data, + _In_ const std::vector& values) +{ + SWSS_LOG_ENTER(); + + std::vector vals = values; + + swss::FieldValueTuple opdata(op, data); + + vals.insert(vals.begin(), opdata); + + std::string msg = swss::JSon::buildJson(vals); + + SWSS_LOG_DEBUG("sending: %s", msg.c_str()); + + int rc = zmq_send(m_ntfSocket, msg.c_str(), msg.length(), 0); + + if (rc < 0) + { + SWSS_LOG_THROW("zmq_send failed, zmqerrno: %d", zmq_errno()); + } +} diff --git a/syncd/ZeroMQNotificationProducer.h b/syncd/ZeroMQNotificationProducer.h new file mode 100644 index 000000000000..3c1647c44997 --- /dev/null +++ b/syncd/ZeroMQNotificationProducer.h @@ -0,0 +1,33 @@ +#pragma once + +#include "NotificationProducerBase.h" + +#include "swss/dbconnector.h" +#include "swss/notificationproducer.h" + +namespace syncd +{ + class ZeroMQNotificationProducer: + public NotificationProducerBase + { + public: + + ZeroMQNotificationProducer( + _In_ const std::string& ntfEndpoint); + + virtual ~ZeroMQNotificationProducer(); + + public: + + virtual void send( + _In_ const std::string& op, + _In_ const std::string& data, + _In_ const std::vector& values) override; + + private: + + void* m_ntfContext; + + void* m_ntfSocket; + }; +} diff --git a/syncd/ZeroMQSelectableChannel.cpp b/syncd/ZeroMQSelectableChannel.cpp new file mode 100644 index 000000000000..2995efb782a7 --- /dev/null +++ b/syncd/ZeroMQSelectableChannel.cpp @@ -0,0 +1,257 @@ +#include "ZeroMQSelectableChannel.h" + +#include "swss/logger.h" +#include "swss/json.h" + +#include +#include + +#define ZMQ_RESPONSE_BUFFER_SIZE (4*1024*1024) + +#define ZMQ_POLL_TIMEOUT (2*60*1000) + +using namespace syncd; + +ZeroMQSelectableChannel::ZeroMQSelectableChannel( + _In_ const std::string& endpoint): + m_endpoint(endpoint), + m_context(nullptr), + m_socket(nullptr), + m_fd(0), + m_allowZmqPoll(false), + m_runThread(true) +{ + SWSS_LOG_ENTER(); + + m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); + + m_context = zmq_ctx_new();; + + m_socket = zmq_socket(m_context, ZMQ_REP); + + int rc = zmq_bind(m_socket, endpoint.c_str()); + + if (rc != 0) + { + SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", + endpoint.c_str(), + zmq_errno()); + } + + size_t fd_len = sizeof(m_fd); + + rc = zmq_getsockopt(m_socket, ZMQ_FD, &m_fd, &fd_len); + + if (rc != 0) + { + SWSS_LOG_THROW("zmq_getsockopt failed on endpoint: %s, zmqerrno: %d", + endpoint.c_str(), + zmq_errno()); + } + + m_zmlPollThread = std::make_shared(&ZeroMQSelectableChannel::zmqPollThread, this); +} + +ZeroMQSelectableChannel::~ZeroMQSelectableChannel() +{ + SWSS_LOG_ENTER(); + + m_runThread = false; + m_allowZmqPoll = true; + + zmq_close(m_socket); + zmq_ctx_destroy(m_context); + + SWSS_LOG_NOTICE("ending zmq poll thread"); + + m_zmlPollThread = nullptr; + + SWSS_LOG_NOTICE("ended zmq poll thread"); +} + +void ZeroMQSelectableChannel::zmqPollThread() +{ + SWSS_LOG_ENTER(); + + SWSS_LOG_NOTICE("begin"); + + while (m_runThread) + { + zmq_pollitem_t items [1] = { }; + + items[0].socket = m_socket; + items[0].events = ZMQ_POLLIN; + + m_allowZmqPoll = false; + + int rc = zmq_poll(items, 1, ZMQ_POLL_TIMEOUT); + + if (rc <= 0 && zmq_errno() == ETERM) + { + SWSS_LOG_NOTICE("zmq_poll ETERM"); + break; + } + + if (rc == 0) + { + SWSS_LOG_INFO("zmq_poll: no events, continue"); + continue; + } + + // TODO we should have loop here in case we get multiple events since + // zmq poll will only signal events once, but in our case we don't + // expect multiple events, since we want to send/receive + + int zmq_events = 0; + size_t zmq_events_len = sizeof(zmq_events); + + rc = zmq_getsockopt(m_socket, ZMQ_EVENTS, &zmq_events, &zmq_events_len); + + if (rc != 0) + { + SWSS_LOG_ERROR("zmq_getsockopt FAILED, zmq_errno: %d", zmq_errno()); + break; + } + + if (rc == 0 && zmq_events & ZMQ_POLLIN) + { + m_selectableEvent.notify(); // will release epoll + + while (m_runThread && !m_allowZmqPoll) + { + usleep(10); // could be increased or replaced by spin lock + + //SWSS_LOG_NOTICE("m_allowZmqPoll == false"); + } + } + else + { + // should not happen, we only except ZMQ_POLLIN events + + SWSS_LOG_ERROR("unknown condition: rc: %d, zmq_events: %d, bug?", rc, zmq_events); + break; + } + } + + SWSS_LOG_NOTICE("end"); +} + +// SelectableChannel overrides + +bool ZeroMQSelectableChannel::empty() +{ + SWSS_LOG_ENTER(); + + return m_queue.size() == 0; +} + +void ZeroMQSelectableChannel::pop( + _Out_ swss::KeyOpFieldsValuesTuple& kco, + _In_ bool initViewMode) +{ + SWSS_LOG_ENTER(); + + if (m_queue.empty()) + { + SWSS_LOG_ERROR("queue is empty, can't pop"); + throw std::runtime_error("queue is empty, can't pop"); + } + + std::string msg = m_queue.front(); + m_queue.pop(); + + auto& values = kfvFieldsValues(kco); + + values.clear(); + + swss::JSon::readJson(msg, values); + + swss::FieldValueTuple fvt = values.at(0); + + kfvKey(kco) = fvField(fvt); + kfvOp(kco) = fvValue(fvt); + + values.erase(values.begin()); +} + +void ZeroMQSelectableChannel::set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& op) +{ + SWSS_LOG_ENTER(); + + std::vector copy = values; + + swss::FieldValueTuple opdata(key, op); + + copy.insert(copy.begin(), opdata); + + std::string msg = swss::JSon::buildJson(copy); + + SWSS_LOG_DEBUG("sending: %s", msg.c_str()); + + int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0); + + // at this point we already did send/receive pattern, so we can notify + // thread that we can poll again + m_allowZmqPoll = true; + + if (rc <= 0) + { + SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d", + m_endpoint.c_str(), + zmq_errno()); + } +} + +// Selectable overrides + +int ZeroMQSelectableChannel::getFd() +{ + SWSS_LOG_ENTER(); + + return m_selectableEvent.getFd(); +} + +uint64_t ZeroMQSelectableChannel::readData() +{ + SWSS_LOG_ENTER(); + + // clear selectable event so it could be triggered in next select() + m_selectableEvent.readData(); + + int rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); + + if (rc < 0) + { + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + } + + if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + { + SWSS_LOG_THROW("zmq_recv message was turncated (over %d bytes, recived %d), increase buffer size, message DROPPED", + ZMQ_RESPONSE_BUFFER_SIZE, + rc); + } + + m_buffer.at(rc) = 0; // make sure that we end string with zero before parse + + m_queue.push((char*)m_buffer.data()); + + return 0; +} + +bool ZeroMQSelectableChannel::hasData() +{ + SWSS_LOG_ENTER(); + + return m_queue.size() > 0; +} + +bool ZeroMQSelectableChannel::hasCachedData() +{ + SWSS_LOG_ENTER(); + + return m_queue.size() > 1; +} diff --git a/syncd/ZeroMQSelectableChannel.h b/syncd/ZeroMQSelectableChannel.h new file mode 100644 index 000000000000..fd3f167ffaad --- /dev/null +++ b/syncd/ZeroMQSelectableChannel.h @@ -0,0 +1,79 @@ +#pragma once + +#include "SelectableChannel.h" + +#include "swss/table.h" +#include "swss/selectableevent.h" + +#include +#include +#include + +namespace syncd +{ + class ZeroMQSelectableChannel: + public SelectableChannel + { + public: + + ZeroMQSelectableChannel( + _In_ const std::string& endpoint); + + virtual ~ZeroMQSelectableChannel(); + + public: // SelectableChannel overrides + + virtual bool empty() override; + + virtual void pop( + _Out_ swss::KeyOpFieldsValuesTuple& kco, + _In_ bool initViewMode) override; + + virtual void set( + _In_ const std::string& key, + _In_ const std::vector& values, + _In_ const std::string& op) override; + + public: // Selectable overrides + + virtual int getFd() override; + + virtual uint64_t readData() override; + + virtual bool hasData() override; + + virtual bool hasCachedData() override; + + // virtual bool initializedWithData() override; + + // virtual void updateAfterRead() override; + + // virtual int getPri() const override; + + private: + + void zmqPollThread(); + + private: + + std::string m_endpoint; + + void* m_context; + + void* m_socket; + + int m_fd; + + std::queue m_queue; + + std::vector m_buffer; + + volatile bool m_allowZmqPoll; + + volatile bool m_runThread; + + std::shared_ptr m_zmlPollThread; + + swss::SelectableEvent m_selectableEvent; + }; +} diff --git a/tests/BCM56850.pl b/tests/BCM56850.pl index 5d5e6a2a7897..b06878baecd5 100755 --- a/tests/BCM56850.pl +++ b/tests/BCM56850.pl @@ -553,7 +553,19 @@ sub test_brcm_acl_limit play "acl_limit.rec"; } +sub test_brcm_buffer_pool_zmq +{ + fresh_start("-p", "$utils::DIR/vsprofile_ctx_zmq.ini", "-s", "-g", "0", "-x", "$utils::DIR/ctx_zmq.json"); + + # we expect no operations on asic, and all buffer pools will be matched correctly + + play("-m", "-p", "$utils::DIR/vsprofile_ctx_zmq.ini", "full_buffer.rec"); + play("-m", "-p", "$utils::DIR/vsprofile_ctx_zmq.ini", "full_buffer_second.rec",0); +} + + # RUN TESTS +test_brcm_buffer_pool_zmq; test_brcm_acl_limit; test_sync_brcm_warm_boot_port_remove; diff --git a/tests/BCM56850/ctx_zmq.json b/tests/BCM56850/ctx_zmq.json new file mode 100644 index 000000000000..37e8eaa89005 --- /dev/null +++ b/tests/BCM56850/ctx_zmq.json @@ -0,0 +1,21 @@ +{ + "CONTEXTS": [ + { + "guid" : 0, + "name" : "syncd0", + "dbAsic" : "ASIC_DB", + "dbCounters" : "COUNTERS_DB", + "dbFlex": "FLEX_COUNTER_DB", + "dbState" : "STATE_DB", + "zmq_enable": true, + "zmq_endpoint": "tcp://127.0.0.1:5555", + "zmq_ntf_endpoint": "tcp://127.0.0.1:5556", + "switches": [ + { + "index" : 0, + "hwinfo" : "" + } + ] + } + ] +} diff --git a/tests/BCM56850/vsprofile_ctx_zmq.ini b/tests/BCM56850/vsprofile_ctx_zmq.ini new file mode 100644 index 000000000000..0a619b19d242 --- /dev/null +++ b/tests/BCM56850/vsprofile_ctx_zmq.ini @@ -0,0 +1,8 @@ +SAI_WARM_BOOT_READ_FILE=./sai_warmboot.bin +SAI_WARM_BOOT_WRITE_FILE=./sai_warmboot.bin +SAI_VS_SWITCH_TYPE=SAI_VS_SWITCH_TYPE_BCM56850 +SAI_VS_INTERFACE_LANE_MAP_FILE=BCM56850/lanemap.ini +#SAI_VS_INTERFACE_LANE_MAP_FILE=BCM56850/lane66.ini +#SAI_VS_HOSTIF_USE_TAP_DEVICE=true +#SAI_VS_RESOURCE_LIMITER_FILE=BCM56850/limits.ini +SAI_REDIS_CONTEXT_CONFIG=BCM56850/ctx_zmq.json diff --git a/tests/Makefile.am b/tests/Makefile.am index 76248af0fcbb..5ba8a4e2b2e3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -13,7 +13,7 @@ SAILIB=-L$(top_srcdir)/vslib/src/.libs -lsaivs vssyncd_SOURCES = ../syncd/main.cpp vssyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) $(SAIFLAGS) -vssyncd_LDADD = ../syncd/libSyncd.a ../lib/src/libSaiRedis.a -lhiredis -lswsscommon $(SAILIB) -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -ldl +vssyncd_LDADD = ../syncd/libSyncd.a ../lib/src/libSaiRedis.a -lhiredis -lswsscommon $(SAILIB) -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -ldl -lzmq if SAITHRIFT vssyncd_LDADD += -lrpcserver -lthrift diff --git a/tests/aspell.en.pws b/tests/aspell.en.pws index bb6bdbf37584..6ab0c4e576fd 100644 --- a/tests/aspell.en.pws +++ b/tests/aspell.en.pws @@ -72,6 +72,9 @@ encap endif endl enum +epoll +errno +ETERM eth ethernet ethX @@ -173,6 +176,7 @@ queueId QUEUEs queueStats readonly +recv redis Redis refactor @@ -181,6 +185,7 @@ refactoring reimplement reinit removedVidToRid +REQ RID RIDs RIDTOVID @@ -286,3 +291,7 @@ WRED www xoff xon +zero +zeromq +zmq +ZMQ diff --git a/tests/utils.pm b/tests/utils.pm index e40afc7ba935..5941c5be0df3 100644 --- a/tests/utils.pm +++ b/tests/utils.pm @@ -83,17 +83,34 @@ sub request_warm_shutdown sub play_common { - my $sync = shift; - my $file = shift; - my $asicop = shift; + my @params = @_; - print color('bright_blue') . "Replay $file" . color('reset') . "\n"; + my $len = @params; - my @ret = `../saiplayer/saiplayer $sync -u "$DIR/$file"`; + my $asicop = $params[$len-1]; + + if ($asicop =~ /^\d+$/) + { + pop@params; + $len = @params; + } + else + { + undef $asicop; + } + + for (my $i = 0; $i < $len; $i++) + { + $params[$i] = "$DIR/$params[$i]" if $params[$i] =~ /\.rec$/; + } + + print color('bright_blue') . "Replay @params" . color('reset') . "\n"; + + my @ret = `../saiplayer/saiplayer -u @params`; if ($? != 0) { - print color('red') . "player $DIR/$file: exitcode: $?:" . color('reset') . "\n"; + print color('red') . "player @params: exitcode: $?:" . color('reset') . "\n"; exit 1; } @@ -125,7 +142,7 @@ sub play_common sub play { - play_common "", @_; + play_common @_; } sub sync_play