diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index 0c6471a9916..f47e5545f53 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -59,7 +59,7 @@ class InboundLedger final : public TimeoutCounter, // Called when another attempt is made to fetch this same ledger void - update(std::uint32_t seq); + update(std::uint32_t seq, bool broadcast); /** Returns true if we got all the data. */ bool @@ -90,7 +90,7 @@ class InboundLedger final : public TimeoutCounter, bool checkLocal(); void - init(ScopedLockType& collectionLock); + init(ScopedLockType& collectionLock, bool broadcast); bool gotData( diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 2c314787fd2..d263ee3e641 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -103,7 +103,7 @@ InboundLedger::InboundLedger( } void -InboundLedger::init(ScopedLockType& collectionLock) +InboundLedger::init(ScopedLockType& collectionLock, bool broadcast) { ScopedLockType sl(mtx_); collectionLock.unlock(); @@ -148,8 +148,18 @@ InboundLedger::init(ScopedLockType& collectionLock) } if (!complete_) { - addPeers(); - queueJob(sl); + if (broadcast) + { + addPeers(); + queueJob(sl); + } + else + { + // Delay to give time to build the ledger before sending + JLOG(journal_.debug()) << "init: Deferring peer requests"; + deferred_ = true; + setTimer(sl); + } return; } @@ -180,7 +190,7 @@ InboundLedger::getPeerCount() const } void -InboundLedger::update(std::uint32_t seq) +InboundLedger::update(std::uint32_t seq, bool broadcast) { ScopedLockType sl(mtx_); @@ -190,6 +200,24 @@ InboundLedger::update(std::uint32_t seq) // Prevent this from being swept touch(); + + // If the signal is to broadcast, and this request has never tried to + // broadcast before, cancel any waiting timer, then fire off the job to + // broadcast. Note that this is calling mPeerSet->getPeerIds(), not + // getPeerCount(), because the latter will filter out peers that have been + // tried, but are since lost. This wants to check if peers have _ever_ been + // tried. If they have, stick with the normal timer flow. + if (broadcast && mPeerSet->getPeerIds().empty()) + { + if (cancelTimer(sl)) + { + JLOG(journal_.debug()) + << "update: cancelling timer to send peer requests"; + deferred_ = false; + addPeers(); + queueJob(sl); + } + } } bool diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index c3ec5632d13..c8d789d977f 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -112,6 +112,18 @@ class InboundLedgersImp : public InboundLedgers // the network, and doesn't have the necessary tx's and // ledger entries to build the ledger. bool const isFull = app_.getOPs().isFull(); + // fallingBehind means the last closed ledger is at least 2 + // behind the validated ledger. If the node is falling + // behind the network, it probably needs information from + // the network to catch up. + // + // The reason this should not simply be only at least 1 + // behind the validated ledger is that a slight lag is + // normal case because some nodes get there slightly later + // than others. A difference of 2 means that at least a full + // ledger interval has passed, so the node is beginning to + // fall behind. + bool const fallingBehind = app_.getOPs().isFallingBehind(); // If everything else is ok, don't try to acquire the ledger // if the requested seq is in the near future relative to // the validated ledger. If the requested ledger is between @@ -134,6 +146,7 @@ class InboundLedgersImp : public InboundLedgers reason == InboundLedger::Reason::CONSENSUS; ss << " Evaluating whether to broadcast requests to peers" << ". full: " << (isFull ? "true" : "false") + << ". falling behind: " << (fallingBehind ? "true" : "false") << ". ledger sequence " << seq << ". Valid sequence: " << validSeq << ". Lag leeway: " << lagLeeway @@ -144,6 +157,9 @@ class InboundLedgersImp : public InboundLedgers // If the node is not synced, send requests. if (!isFull) return true; + // If the node is falling behind, send requests. + if (fallingBehind) + return true; // If the ledger is in the near future, do NOT send requests. // This node is probably about to build it. if (nearFuture) @@ -154,7 +170,7 @@ class InboundLedgersImp : public InboundLedgers return false; return true; }(); - ss << ". Would broadcast to peers? " + ss << ". Broadcast to peers? " << (shouldBroadcast ? "true." : "false."); if (!shouldAcquire) @@ -189,7 +205,7 @@ class InboundLedgersImp : public InboundLedgers std::ref(m_clock), mPeerSetBuilder->build()); mLedgers.emplace(hash, inbound); - inbound->init(sl); + inbound->init(sl, shouldBroadcast); ++mCounter; } } @@ -202,7 +218,7 @@ class InboundLedgersImp : public InboundLedgers } if (!isNew) - inbound->update(seq); + inbound->update(seq, shouldBroadcast); if (!inbound->isComplete()) { diff --git a/src/ripple/app/ledger/impl/TimeoutCounter.cpp b/src/ripple/app/ledger/impl/TimeoutCounter.cpp index 5e9bcc8141f..b0c40536bf2 100644 --- a/src/ripple/app/ledger/impl/TimeoutCounter.cpp +++ b/src/ripple/app/ledger/impl/TimeoutCounter.cpp @@ -74,6 +74,14 @@ TimeoutCounter::setTimer(ScopedLockType& sl) }); } +std::size_t +TimeoutCounter::cancelTimer(ScopedLockType& sl) +{ + auto const ret = timer_.cancel(); + JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)"; + return ret; +} + void TimeoutCounter::queueJob(ScopedLockType& sl) { @@ -108,7 +116,10 @@ TimeoutCounter::invokeOnTimer() if (!progress_) { - ++timeouts_; + if (deferred_) + deferred_ = false; + else + ++timeouts_; JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") " << " acquiring " << hash_; onTimer(false, sl); diff --git a/src/ripple/app/ledger/impl/TimeoutCounter.h b/src/ripple/app/ledger/impl/TimeoutCounter.h index 3300e2b733d..630f055d0bc 100644 --- a/src/ripple/app/ledger/impl/TimeoutCounter.h +++ b/src/ripple/app/ledger/impl/TimeoutCounter.h @@ -101,6 +101,10 @@ class TimeoutCounter void setTimer(ScopedLockType&); + /** Cancel any waiting timer */ + std::size_t + cancelTimer(ScopedLockType&); + /** Queue a job to call invokeOnTimer(). */ void queueJob(ScopedLockType&); @@ -132,6 +136,9 @@ class TimeoutCounter int timeouts_; bool complete_; bool failed_; + /** Whether the initialization deferred doing any work until the first + * timeout. */ + bool deferred_ = false; /** Whether forward progress has been made. */ bool progress_; /** The minimum time to wait between calls to execute(). */ diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 41fda5323b7..70b261f596a 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -433,6 +433,8 @@ class NetworkOPsImp final : public NetworkOPs clearLedgerFetch() override; Json::Value getLedgerFetchInfo() override; + bool + isFallingBehind() const override; std::uint32_t acceptLedger( std::optional consensusDelay) override; @@ -732,6 +734,7 @@ class NetworkOPsImp final : public NetworkOPs std::atomic amendmentBlocked_{false}; std::atomic amendmentWarned_{false}; std::atomic unlBlocked_{false}; + std::atomic fallingBehind_{false}; ClosureCounter waitHandlerCounter_; boost::asio::steady_timer heartbeatTimer_; @@ -1819,13 +1822,25 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed) auto closingInfo = m_ledgerMaster.getCurrentLedger()->info(); - JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq + JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq << " with LCL " << closingInfo.parentHash; - auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash); + fallingBehind_ = false; + if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1) + { + fallingBehind_ = true; + JLOG(m_journal.warn()) + << "beginConsensus Current ledger " << closingInfo.seq + << " is at least 2 behind validated " + << m_ledgerMaster.getValidLedgerIndex(); + } + + auto const prevLedger = + m_ledgerMaster.getLedgerByHash(closingInfo.parentHash); if (!prevLedger) { + fallingBehind_ = true; // this shouldn't happen unless we jump ledgers if (mMode == OperatingMode::FULL) { @@ -1873,7 +1888,7 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed) mLastConsensusPhase = currPhase; } - JLOG(m_journal.debug()) << "Initiating consensus engine"; + JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine"; return true; } @@ -1946,7 +1961,7 @@ NetworkOPsImp::endConsensus() { // check if the ledger is good enough to go to FULL // Note: Do not go to FULL if we don't have the previous ledger - // check if the ledger is bad enough to go to CONNECTE D -- TODO + // check if the ledger is bad enough to go to CONNECTED -- TODO auto current = m_ledgerMaster.getCurrentLedger(); if (app_.timeKeeper().now() < (current->info().parentCloseTime + 2 * current->info().closeTimeResolution)) @@ -2781,6 +2796,12 @@ NetworkOPsImp::getLedgerFetchInfo() return app_.getInboundLedgers().getInfo(); } +bool +NetworkOPsImp::isFallingBehind() const +{ + return fallingBehind_; +} + void NetworkOPsImp::pubProposedTransaction( std::shared_ptr const& ledger, diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index 0fd86b2554d..d720324176a 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -227,6 +227,8 @@ class NetworkOPs : public InfoSub::Source clearLedgerFetch() = 0; virtual Json::Value getLedgerFetchInfo() = 0; + virtual bool + isFallingBehind() const = 0; /** Accepts the current transaction tree, return the new ledger's sequence