Skip to content

Commit

Permalink
Optimize when to acquire ledgers from the network.
Browse files Browse the repository at this point in the history
Particularly avoid acquiring ledgers likely to be
produced locally very soon.

Derived from XRPLF#4764

Co-authored-by: Mark Travis <[email protected]>
  • Loading branch information
ximinez and mtrippled committed Sep 5, 2024
1 parent f290324 commit b607ff4
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class InboundLedger final : public TimeoutCounter,

// Called when another attempt is made to fetch this same ledger
void
update(std::uint32_t seq);
update(std::uint32_t seq, bool broadcast);

/** Returns true if we got all the data. */
bool
Expand Down Expand Up @@ -90,7 +90,7 @@ class InboundLedger final : public TimeoutCounter,
bool
checkLocal();
void
init(ScopedLockType& collectionLock);
init(ScopedLockType& collectionLock, bool broadcast);

bool
gotData(
Expand Down
36 changes: 32 additions & 4 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ InboundLedger::InboundLedger(
}

void
InboundLedger::init(ScopedLockType& collectionLock)
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
{
ScopedLockType sl(mtx_);
collectionLock.unlock();
Expand Down Expand Up @@ -148,8 +148,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
}
if (!complete_)
{
addPeers();
queueJob(sl);
if (broadcast)
{
addPeers();
queueJob(sl);
}
else
{
// Delay to give time to build the ledger before sending
JLOG(journal_.debug()) << "init: Deferring peer requests";
deferred_ = true;
setTimer(sl);
}
return;
}

Expand Down Expand Up @@ -180,7 +190,7 @@ InboundLedger::getPeerCount() const
}

void
InboundLedger::update(std::uint32_t seq)
InboundLedger::update(std::uint32_t seq, bool broadcast)
{
ScopedLockType sl(mtx_);

Expand All @@ -190,6 +200,24 @@ InboundLedger::update(std::uint32_t seq)

// Prevent this from being swept
touch();

// If the signal is to broadcast, and this request has never tried to
// broadcast before, cancel any waiting timer, then fire off the job to
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
// getPeerCount(), because the latter will filter out peers that have been
// tried, but are since lost. This wants to check if peers have _ever_ been
// tried. If they have, stick with the normal timer flow.
if (broadcast && mPeerSet->getPeerIds().empty())
{
if (cancelTimer(sl))
{
JLOG(journal_.debug())
<< "update: cancelling timer to send peer requests";
deferred_ = false;
addPeers();
queueJob(sl);
}
}
}

bool
Expand Down
22 changes: 19 additions & 3 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ class InboundLedgersImp : public InboundLedgers
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// fallingBehind means the last closed ledger is at least 2
// behind the validated ledger. If the node is falling
// behind the network, it probably needs information from
// the network to catch up.
//
// The reason this should not simply be only at least 1
// behind the validated ledger is that a slight lag is
// normal case because some nodes get there slightly later
// than others. A difference of 2 means that at least a full
// ledger interval has passed, so the node is beginning to
// fall behind.
bool const fallingBehind = app_.getOPs().isFallingBehind();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
Expand All @@ -134,6 +146,7 @@ class InboundLedgersImp : public InboundLedgers
reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false")
<< ". falling behind: " << (fallingBehind ? "true" : "false")
<< ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq
<< ". Lag leeway: " << lagLeeway
Expand All @@ -144,6 +157,9 @@ class InboundLedgersImp : public InboundLedgers
// If the node is not synced, send requests.
if (!isFull)
return true;
// If the node is falling behind, send requests.
if (fallingBehind)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
Expand All @@ -154,7 +170,7 @@ class InboundLedgersImp : public InboundLedgers
return false;
return true;
}();
ss << ". Would broadcast to peers? "
ss << ". Broadcast to peers? "
<< (shouldBroadcast ? "true." : "false.");

if (!shouldAcquire)
Expand Down Expand Up @@ -189,7 +205,7 @@ class InboundLedgersImp : public InboundLedgers
std::ref(m_clock),
mPeerSetBuilder->build());
mLedgers.emplace(hash, inbound);
inbound->init(sl);
inbound->init(sl, shouldBroadcast);
++mCounter;
}
}
Expand All @@ -202,7 +218,7 @@ class InboundLedgersImp : public InboundLedgers
}

if (!isNew)
inbound->update(seq);
inbound->update(seq, shouldBroadcast);

if (!inbound->isComplete())
{
Expand Down
13 changes: 12 additions & 1 deletion src/ripple/app/ledger/impl/TimeoutCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
});
}

std::size_t
TimeoutCounter::cancelTimer(ScopedLockType& sl)
{
auto const ret = timer_.cancel();
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
return ret;
}

void
TimeoutCounter::queueJob(ScopedLockType& sl)
{
Expand Down Expand Up @@ -108,7 +116,10 @@ TimeoutCounter::invokeOnTimer()

if (!progress_)
{
++timeouts_;
if (deferred_)
deferred_ = false;
else
++timeouts_;
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
<< " acquiring " << hash_;
onTimer(false, sl);
Expand Down
7 changes: 7 additions & 0 deletions src/ripple/app/ledger/impl/TimeoutCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class TimeoutCounter
void
setTimer(ScopedLockType&);

/** Cancel any waiting timer */
std::size_t
cancelTimer(ScopedLockType&);

/** Queue a job to call invokeOnTimer(). */
void
queueJob(ScopedLockType&);
Expand Down Expand Up @@ -132,6 +136,9 @@ class TimeoutCounter
int timeouts_;
bool complete_;
bool failed_;
/** Whether the initialization deferred doing any work until the first
* timeout. */
bool deferred_ = false;
/** Whether forward progress has been made. */
bool progress_;
/** The minimum time to wait between calls to execute(). */
Expand Down
29 changes: 25 additions & 4 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ class NetworkOPsImp final : public NetworkOPs
clearLedgerFetch() override;
Json::Value
getLedgerFetchInfo() override;
bool
isFallingBehind() const override;
std::uint32_t
acceptLedger(
std::optional<std::chrono::milliseconds> consensusDelay) override;
Expand Down Expand Up @@ -732,6 +734,7 @@ class NetworkOPsImp final : public NetworkOPs
std::atomic<bool> amendmentBlocked_{false};
std::atomic<bool> amendmentWarned_{false};
std::atomic<bool> unlBlocked_{false};
std::atomic<bool> fallingBehind_{false};

ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
boost::asio::steady_timer heartbeatTimer_;
Expand Down Expand Up @@ -1819,13 +1822,25 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)

auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();

JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq
<< " with LCL " << closingInfo.parentHash;

auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
fallingBehind_ = false;
if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1)
{
fallingBehind_ = true;
JLOG(m_journal.warn())
<< "beginConsensus Current ledger " << closingInfo.seq
<< " is at least 2 behind validated "
<< m_ledgerMaster.getValidLedgerIndex();
}

auto const prevLedger =
m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);

if (!prevLedger)
{
fallingBehind_ = true;
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
Expand Down Expand Up @@ -1873,7 +1888,7 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
mLastConsensusPhase = currPhase;
}

JLOG(m_journal.debug()) << "Initiating consensus engine";
JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine";
return true;
}

Expand Down Expand Up @@ -1946,7 +1961,7 @@ NetworkOPsImp::endConsensus()
{
// check if the ledger is good enough to go to FULL
// Note: Do not go to FULL if we don't have the previous ledger
// check if the ledger is bad enough to go to CONNECTE D -- TODO
// check if the ledger is bad enough to go to CONNECTED -- TODO
auto current = m_ledgerMaster.getCurrentLedger();
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2 * current->info().closeTimeResolution))
Expand Down Expand Up @@ -2781,6 +2796,12 @@ NetworkOPsImp::getLedgerFetchInfo()
return app_.getInboundLedgers().getInfo();
}

bool
NetworkOPsImp::isFallingBehind() const
{
return fallingBehind_;
}

void
NetworkOPsImp::pubProposedTransaction(
std::shared_ptr<ReadView const> const& ledger,
Expand Down
2 changes: 2 additions & 0 deletions src/ripple/app/misc/NetworkOPs.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ class NetworkOPs : public InfoSub::Source
clearLedgerFetch() = 0;
virtual Json::Value
getLedgerFetchInfo() = 0;
virtual bool
isFallingBehind() const = 0;

/** Accepts the current transaction tree, return the new ledger's sequence
Expand Down

0 comments on commit b607ff4

Please sign in to comment.