Skip to content

Commit

Permalink
Optimize when to acquire ledgers from the network.
Browse files Browse the repository at this point in the history
Particularly avoid acquiring ledgers likely to be
produced locally very soon.

Derived from XRPLF#4764

Co-authored-by: Mark Travis <[email protected]>
  • Loading branch information
ximinez and mtrippled committed Sep 2, 2024
1 parent 8aaf815 commit 2a5bfc8
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 27 deletions.
24 changes: 22 additions & 2 deletions src/ripple/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class InboundLedger final : public TimeoutCounter,

// Called when another attempt is made to fetch this same ledger
void
update(std::uint32_t seq);
update(std::uint32_t seq, bool broadcast);

/** Returns true if we got all the data. */
bool
Expand Down Expand Up @@ -90,7 +90,7 @@ class InboundLedger final : public TimeoutCounter,
bool
checkLocal();
void
init(ScopedLockType& collectionLock);
init(ScopedLockType& collectionLock, bool broadcast);

bool
gotData(
Expand Down Expand Up @@ -197,6 +197,26 @@ class InboundLedger final : public TimeoutCounter,
std::unique_ptr<PeerSet> mPeerSet;
};

inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case SHARD:
return "SHARD";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
assert(false);
return "unknown";
}
}

} // namespace ripple

#endif
45 changes: 40 additions & 5 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ InboundLedger::InboundLedger(
}

void
InboundLedger::init(ScopedLockType& collectionLock)
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
{
ScopedLockType sl(mtx_);
collectionLock.unlock();
Expand Down Expand Up @@ -148,8 +148,18 @@ InboundLedger::init(ScopedLockType& collectionLock)
}
if (!complete_)
{
addPeers();
queueJob(sl);
if (broadcast)
{
addPeers();
queueJob(sl);
}
else
{
// Delay to give time to build the ledger before sending
JLOG(journal_.debug()) << "init: Deferring peer requests";
deferred_ = true;
setTimer(sl);
}
return;
}

Expand Down Expand Up @@ -180,7 +190,7 @@ InboundLedger::getPeerCount() const
}

void
InboundLedger::update(std::uint32_t seq)
InboundLedger::update(std::uint32_t seq, bool broadcast)
{
ScopedLockType sl(mtx_);

Expand All @@ -190,6 +200,24 @@ InboundLedger::update(std::uint32_t seq)

// Prevent this from being swept
touch();

// If the signal is to broadcast, and this request has never tried to
// broadcast before, cancel any waiting timer, then fire off the job to
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
// getPeerCount(), because the latter will filter out peers that have been
// tried, but are since lost. This wants to check if peers have _ever_ been
// tried. If they have, stick with the normal timer flow.
if (broadcast && mPeerSet->getPeerIds().empty())
{
if (cancelTimer(sl))
{
JLOG(journal_.debug())
<< "update: cancelling timer to send peer requests";
deferred_ = false;
addPeers();
queueJob(sl);
}
}
}

bool
Expand Down Expand Up @@ -428,7 +456,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)

if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
assert(isDone());
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}

mByHash = true;

Expand Down
124 changes: 114 additions & 10 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,114 @@ class InboundLedgersImp : public InboundLedgers
reason != InboundLedger::Reason::SHARD ||
(seq != 0 && app_.getShardStore()));

// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
bool const shouldAcquire = [&]() {
if (!app_.getOPs().isNeedNetworkLedger())
return true;
if (reason == InboundLedger::Reason::GENERIC)
return true;
if (reason == InboundLedger::Reason::CONSENSUS)
return true;
return false;
}();
assert(
shouldAcquire ==
!(app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS)));

std::stringstream ss;
ss << "InboundLedger::acquire: "
<< "Request: " << to_string(hash) << ", " << seq
<< " NeedNetworkLedger: "
<< (app_.getOPs().isNeedNetworkLedger() ? "yes" : "no")
<< " Reason: " << to_string(reason)
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");

/* Acquiring ledgers is somewhat expensive. It requires lots of
* computation and network communication. Avoid it when it's not
* appropriate. Every validation from a peer for a ledger that
* we do not have locally results in a call to this function: even
* if we are moments away from validating the same ledger.
*/
bool const shouldBroadcast = [&]() {
// If the node is not in "full" state, it needs to sync to
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// fallingBehind means the last closed ledger is at least 2
// behind the validated ledger. If the node is falling
// behind the network, it probably needs information from
// the network to catch up.
//
// The reason this should not simply be only at least 1
// behind the validated ledger is that a slight lag is
// normal case because some nodes get there slightly later
// than others. A difference of 2 means that at least a full
// ledger interval has passed, so the node is beginning to
// fall behind.
bool const fallingBehind = app_.getOPs().isFallingBehind();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
// 1 and 19 inclusive ledgers ahead of the valid ledger this
// node has not built it yet, but it's possible/likely it
// has the tx's necessary to build it and get caught up.
// Plus it might not become validated. On the other hand, if
// it's more than 20 in the future, this node should request
// it so that it can jump ahead and get caught up.
LedgerIndex const validSeq =
app_.getLedgerMaster().getValidLedgerIndex();
constexpr std::size_t lagLeeway = 20;
bool const nearFuture =
(seq > validSeq) && (seq < validSeq + lagLeeway);
// If everything else is ok, don't try to acquire the ledger
// if the request is related to consensus. (Note that
// consensus calls usually pass a seq of 0, so nearFuture
// will be false other than on a brand new network.)
bool const consensus =
reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false")
<< ". falling behind: " << (fallingBehind ? "true" : "false")
<< ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq
<< ". Lag leeway: " << lagLeeway
<< ". request for near future ledger: "
<< (nearFuture ? "true" : "false")
<< ". Consensus: " << (consensus ? "true" : "false");

// If the node is not synced, send requests.
if (!isFull)
return true;
// If the node is falling behind, send requests.
if (fallingBehind)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
return false;
// If the request is because of consensus, do NOT send requests.
// This node is probably about to build it.
if (consensus)
return false;
return true;
}();
ss << ". Broadcast to peers? "
<< (shouldBroadcast ? "true." : "false.");

if (!shouldAcquire)
{
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
return {};
}

bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}

Expand All @@ -109,19 +205,26 @@ class InboundLedgersImp : public InboundLedgers
std::ref(m_clock),
mPeerSetBuilder->build());
mLedgers.emplace(hash, inbound);
inbound->init(sl);
inbound->init(sl, shouldBroadcast);
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");

if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}

if (!isNew)
inbound->update(seq);
inbound->update(seq, shouldBroadcast);

if (!inbound->isComplete())
{
JLOG(j_.debug()) << "Incomplete: " << ss.str();
return {};
}

if (reason == InboundLedger::Reason::HISTORY)
{
Expand All @@ -134,21 +237,22 @@ class InboundLedgersImp : public InboundLedgers
if (!shardStore)
{
JLOG(j_.error())
<< "Acquiring shard with no shard store available";
<< "Acquiring shard with no shard store available"
<< ss.str();
return {};
}
if (inbound->getLedger()->stateMap().family().isShardBacked())
shardStore->setStored(inbound->getLedger());
else
shardStore->storeLedger(inbound->getLedger());
}

JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
return perf::measureDurationAndLog(
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);

return ledger;
}

void
Expand Down
5 changes: 3 additions & 2 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,8 +1114,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
}

JLOG(m_journal.info()) << "Advancing accepted ledger to "
<< ledger->info().seq << " with >= " << minVal
<< " validations";
<< ledger->info().seq << " ("
<< to_short_string(ledger->info().hash)
<< ") with >= " << minVal << " validations";

ledger->setValidated();
ledger->setFull();
Expand Down
20 changes: 19 additions & 1 deletion src/ripple/app/ledger/impl/TimeoutCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_;
timer_.expires_after(timerInterval_);
timer_.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) {
Expand All @@ -61,11 +62,25 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
ScopedLockType sl(ptr->mtx_);
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec << " (operation_aborted: "
<< boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted"
: "other")
<< ")";
ptr->queueJob(sl);
}
});
}

std::size_t
TimeoutCounter::cancelTimer(ScopedLockType& sl)
{
auto const ret = timer_.cancel();
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
return ret;
}

void
TimeoutCounter::queueJob(ScopedLockType& sl)
{
Expand Down Expand Up @@ -100,7 +115,10 @@ TimeoutCounter::invokeOnTimer()

if (!progress_)
{
++timeouts_;
if (deferred_)
deferred_ = false;
else
++timeouts_;
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
<< " acquiring " << hash_;
onTimer(false, sl);
Expand Down
7 changes: 7 additions & 0 deletions src/ripple/app/ledger/impl/TimeoutCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class TimeoutCounter
void
setTimer(ScopedLockType&);

/** Cancel any waiting timer */
std::size_t
cancelTimer(ScopedLockType&);

/** Queue a job to call invokeOnTimer(). */
void
queueJob(ScopedLockType&);
Expand Down Expand Up @@ -132,6 +136,9 @@ class TimeoutCounter
int timeouts_;
bool complete_;
bool failed_;
/** Whether the initialization deferred doing any work until the first
* timeout. */
bool deferred_ = false;
/** Whether forward progress has been made. */
bool progress_;
/** The minimum time to wait between calls to execute(). */
Expand Down
Loading

0 comments on commit 2a5bfc8

Please sign in to comment.