Skip to content

Commit

Permalink
Apply transaction batches in periodic intervals.
Browse files Browse the repository at this point in the history
Add new transaction submission API field, "sync", which
determines behavior of the server while submitting transactions:
1) sync (default): Process transactions in a batch immediately,
   and return only once the transaction has been processed.
2) async: Put transaction into the batch for the next processing
   interval and return immediately.
3) wait: Put transaction into the batch for the next processing
   interval and return only after it is processed.
  • Loading branch information
mtrippled committed Apr 21, 2023
1 parent c500396 commit 5491a1e
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 145 deletions.
1 change: 1 addition & 0 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ ApplicationImp::start(bool withTimers)
{
setSweepTimer();
setEntropyTimer();
m_networkOPs->setBatchApplyTimer();
}

m_io_latency_sampler.start();
Expand Down
208 changes: 97 additions & 111 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class NetworkOPsImp final : public NetworkOPs
, heartbeatTimer_(io_svc)
, clusterTimer_(io_svc)
, accountHistoryTxTimer_(io_svc)
, batchApplyTimer_(io_svc)
, mConsensus(
app,
make_FeeVote(
Expand Down Expand Up @@ -280,43 +281,12 @@ class NetworkOPsImp final : public NetworkOPs
processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal,
FailHard failType) override;

/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
*
* @param transaction Transaction object.
* @param bUnliimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void
doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType);

/**
* For transactions not submitted by a locally connected client, fire and
* forget. Add to batch and trigger it to be processed if there's no batch
* currently being applied.
*
* @param transaction Transaction object
* @param bUnlimited Whether a privileged client connection submitted it.
* @param failType fail_hard setting from transaction submission.
*/
void
doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failtype);

/**
* Apply transactions in batches. Continue until none are queued.
*/
void
transactionBatch();
bool
transactionBatch(bool const drain) override;

/**
* Attempt to apply transactions and post-process based on the results.
Expand Down Expand Up @@ -590,6 +560,15 @@ class NetworkOPsImp final : public NetworkOPs
<< "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message();
}

ec.clear();
batchApplyTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: batchApplyTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done.
using namespace std::chrono_literals;
Expand Down Expand Up @@ -708,6 +687,9 @@ class NetworkOPsImp final : public NetworkOPs
void
setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo);

void
setBatchApplyTimer() override;

Application& app_;
beast::Journal m_journal;

Expand All @@ -726,6 +708,8 @@ class NetworkOPsImp final : public NetworkOPs
boost::asio::steady_timer heartbeatTimer_;
boost::asio::steady_timer clusterTimer_;
boost::asio::steady_timer accountHistoryTxTimer_;
//! This timer is for for applying transaction batches.
boost::asio::steady_timer batchApplyTimer_;

RCLConsensus mConsensus;

Expand Down Expand Up @@ -1000,6 +984,37 @@ NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
[this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
}

void
NetworkOPsImp::setBatchApplyTimer()
{
using namespace std::chrono_literals;
static auto const batchInterval = 100ms;

setTimer(
batchApplyTimer_,
batchInterval,
[this]() {
std::unique_lock lock(mMutex);
// Only do the job if there's work to do and it's not currently
// being done.
if (mTransactions.size() && mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
transactionBatch(false);
}))
{
mDispatchState = DispatchState::scheduled;
}
}
else
{
lock.unlock();
setBatchApplyTimer();
}
},
[this]() { setBatchApplyTimer(); });
}

void
NetworkOPsImp::processHeartbeatTimer()
{
Expand Down Expand Up @@ -1176,14 +1191,16 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)

m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
processTransaction(
t, false, RPC::SubmitSync::async, false, FailHard::no);
});
}

void
NetworkOPsImp::processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal,
FailHard failType)
{
Expand Down Expand Up @@ -1213,7 +1230,7 @@ NetworkOPsImp::processTransaction(
// Not concerned with local checks at this point.
if (validity == Validity::SigBad)
{
JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
JLOG(m_journal.trace()) << "Transaction has bad signature: " << reason;
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
Expand All @@ -1223,89 +1240,59 @@ NetworkOPsImp::processTransaction(
// canonicalize can change our pointer
app_.getMasterTransaction().canonicalize(&transaction);

if (bLocal)
doTransactionSync(transaction, bUnlimited, failType);
else
doTransactionAsync(transaction, bUnlimited, failType);
}

void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
{
std::lock_guard lock(mMutex);

if (transaction->getApplying())
return;

mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, false, failType));
transaction->setApplying();

if (mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
}
}
}

void
NetworkOPsImp::doTransactionSync(
std::shared_ptr<Transaction> transaction,
bool bUnlimited,
FailHard failType)
{
std::unique_lock<std::mutex> lock(mMutex);

std::unique_lock lock(mMutex);
if (!transaction->getApplying())
{
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, true, failType));
transaction->setApplying();
mTransactions.push_back(
TransactionStatus(transaction, bUnlimited, bLocal, failType));
}

do
switch (sync)
{
if (mDispatchState == DispatchState::running)
{
// A batch processing job is already running, so wait.
mCond.wait(lock);
}
else
{
apply(lock);

if (mTransactions.size())
case RPC::SubmitSync::sync:
do
{
// More transactions need to be applied, but by another job.
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
transactionBatch();
}))
{
mDispatchState = DispatchState::scheduled;
}
}
}
} while (transaction->getApplying());
// If a batch is being processed, then wait. Otherwise,
// process a batch.
if (mDispatchState == DispatchState::running)
mCond.wait(lock);
else
apply(lock);
} while (transaction->getApplying());
break;

case RPC::SubmitSync::async:
// Ensure that the tx object is returned only after being
// processed, so the client can expect the terSUBMITTED
// result in all cases.
transaction = std::make_shared<Transaction>(*transaction);
transaction->setResult(terSUBMITTED);
break;

case RPC::SubmitSync::wait:
mCond.wait(
lock, [&transaction] { return !transaction->getApplying(); });
break;

default:
assert(false);
}
}

void
NetworkOPsImp::transactionBatch()
bool
NetworkOPsImp::transactionBatch(bool const drain)
{
std::unique_lock<std::mutex> lock(mMutex);

if (mDispatchState == DispatchState::running)
return;

while (mTransactions.size())
{
apply(lock);
std::unique_lock<std::mutex> lock(mMutex);
if (mDispatchState == DispatchState::running || mTransactions.empty())
return false;

do
apply(lock);
while (drain && mTransactions.size());
}
setBatchApplyTimer();
return true;
}

void
Expand All @@ -1314,9 +1301,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
std::vector<TransactionStatus> submit_held;
std::vector<TransactionStatus> transactions;
mTransactions.swap(transactions);
assert(!transactions.empty());

assert(mDispatchState != DispatchState::running);
mDispatchState = DispatchState::running;

batchLock.unlock();
Expand Down Expand Up @@ -1700,7 +1684,9 @@ NetworkOPsImp::checkLastClosedLedger(
switchLedgers = false;
}
else
{
networkClosed = closedLedger;
}

if (!switchLedgers)
return false;
Expand Down
38 changes: 32 additions & 6 deletions src/ripple/app/misc/NetworkOPs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <ripple/net/InfoSub.h>
#include <ripple/protocol/STValidation.h>
#include <ripple/protocol/messages.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <boost/asio.hpp>
#include <deque>
#include <memory>
Expand Down Expand Up @@ -71,6 +72,10 @@ enum class OperatingMode {
FULL = 4 //!< we have the ledger and can even validate
};

namespace RPC {
enum class SubmitSync;
}

/** Provides server functionality for clients.
Clients include backend applications, local commands, and connected
Expand Down Expand Up @@ -123,22 +128,41 @@ class NetworkOPs : public InfoSub::Source
virtual void
submitTransaction(std::shared_ptr<STTx const> const&) = 0;

/**
* Process transactions as they arrive from the network or which are
* submitted by clients. Process local transactions synchronously
/** Process a transaction.
*
* The transaction has been submitted either from the peer network or
* from a client. For client submissions, there are 3 distinct behaviors:
* 1) sync (default): process transactions in a batch immediately,
* and return only once the transaction has been processed.
* 2) async: Put transaction into the batch for the next processing
* interval and return immediately.
* 3) wait: Put transaction into the batch for the next processing
* interval and return only after it is processed.
*
* @param transaction Transaction object
* @param transaction Transaction object.
* @param bUnlimited Whether a privileged client connection submitted it.
* @param bLocal Client submission.
* @param failType fail_hard setting from transaction submission.
* @param sync Client submission synchronous behavior type requested.
* @param bLocal Whether submitted by client (local) or peer.
* @param failType Whether to fail hard or not.
*/
virtual void
processTransaction(
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
RPC::SubmitSync sync,
bool bLocal,
FailHard failType) = 0;

/** Apply transactions in batches.
*
* Only a single batch unless drain is set.
*
* @param drain Whether to process batches until none remain.
* @return Whether any transactions were processed.
*/
virtual bool
transactionBatch(bool const drain) = 0;

//--------------------------------------------------------------------------
//
// Owner functions
Expand Down Expand Up @@ -187,6 +211,8 @@ class NetworkOPs : public InfoSub::Source
setStandAlone() = 0;
virtual void
setStateTimer() = 0;
virtual void
setBatchApplyTimer() = 0;

virtual void
setNeedNetworkLedger() = 0;
Expand Down
Loading

0 comments on commit 5491a1e

Please sign in to comment.