Skip to content

Commit

Permalink
Simplify and improve order book tracking:
Browse files Browse the repository at this point in the history
- Avoid using std::shared_ptr
- Prefer using unordered maps to avoid linear searches
- Increase the interval between full order book updates
  • Loading branch information
nbougalis committed Mar 29, 2022
1 parent 48803a4 commit b9903bb
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 265 deletions.
257 changes: 118 additions & 139 deletions src/ripple/app/ledger/OrderBookDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/OrderBookDB.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/Log.h>
#include <ripple/core/Config.h>
#include <ripple/core/JobQueue.h>
Expand All @@ -28,70 +29,72 @@
namespace ripple {

OrderBookDB::OrderBookDB(Application& app)
: app_(app), mSeq(0), j_(app.journal("OrderBookDB"))
: app_(app), seq_(0), j_(app.journal("OrderBookDB"))
{
}

void
OrderBookDB::invalidate()
{
std::lock_guard sl(mLock);
mSeq = 0;
}

void
OrderBookDB::setup(std::shared_ptr<ReadView const> const& ledger)
{
if (!app_.config().standalone() && app_.getOPs().isNeedNetworkLedger())
{
std::lock_guard sl(mLock);
auto seq = ledger->info().seq;
JLOG(j_.warn()) << "Eliding full order book update: no ledger";
return;
}

// Do a full update every 256 ledgers
if (mSeq != 0)
{
if (seq == mSeq)
return;
if ((seq > mSeq) && ((seq - mSeq) < 256))
return;
if ((seq < mSeq) && ((mSeq - seq) < 16))
return;
}
auto seq = seq_.load();

JLOG(j_.debug()) << "Advancing from " << mSeq << " to " << seq;
if (seq != 0)
{
if ((seq > ledger->seq()) && ((ledger->seq() - seq) < 25600))
return;

mSeq = seq;
if ((ledger->seq() <= seq) && ((seq - ledger->seq()) < 16))
return;
}

if (seq_.exchange(ledger->seq()) != seq)
return;

JLOG(j_.debug()) << "Full order book update: " << seq << " to "
<< ledger->seq();

if (app_.config().PATH_SEARCH_MAX != 0)
{
if (app_.config().standalone())
update(ledger);
else
app_.getJobQueue().addJob(
jtUPDATE_PF, "OrderBookDB::update", [this, ledger]() {
update(ledger);
});
jtUPDATE_PF,
"OrderBookDB::update: " + std::to_string(ledger->seq()),
[this, ledger]() { update(ledger); });
}
}

void
OrderBookDB::update(std::shared_ptr<ReadView const> const& ledger)
{
hash_set<uint256> seen;
OrderBookDB::IssueToOrderBook destMap;
OrderBookDB::IssueToOrderBook sourceMap;
hash_set<Issue> XRPBooks;

JLOG(j_.debug()) << "OrderBookDB::update>";

if (app_.config().PATH_SEARCH_MAX == 0)
return; // pathfinding has been disabled

// A newer full update job is pending
if (auto const seq = seq_.load(); seq > ledger->seq())
{
// pathfinding has been disabled
JLOG(j_.debug()) << "Eliding update for " << ledger->seq()
<< " because of pending update to later " << seq;
return;
}

decltype(allBooks_) allBooks;
decltype(xrpBooks_) xrpBooks;

allBooks.reserve(allBooks_.size());
xrpBooks.reserve(xrpBooks_.size());

JLOG(j_.debug()) << "Beginning update (" << ledger->seq() << ")";

// walk through the entire ledger looking for orderbook entries
int books = 0;
int cnt = 0;

try
{
Expand All @@ -100,9 +103,8 @@ OrderBookDB::update(std::shared_ptr<ReadView const> const& ledger)
if (app_.isStopping())
{
JLOG(j_.info())
<< "OrderBookDB::update exiting due to isStopping";
std::lock_guard sl(mLock);
mSeq = 0;
<< "Update halted because the process is stopping";
seq_.store(0);
return;
}

Expand All @@ -111,101 +113,89 @@ OrderBookDB::update(std::shared_ptr<ReadView const> const& ledger)
sle->getFieldH256(sfRootIndex) == sle->key())
{
Book book;

book.in.currency = sle->getFieldH160(sfTakerPaysCurrency);
book.in.account = sle->getFieldH160(sfTakerPaysIssuer);
book.out.account = sle->getFieldH160(sfTakerGetsIssuer);
book.out.currency = sle->getFieldH160(sfTakerGetsCurrency);
book.out.account = sle->getFieldH160(sfTakerGetsIssuer);

allBooks[book.in].insert(book.out);

if (isXRP(book.out))
xrpBooks.insert(book.in);

uint256 index = getBookBase(book);
if (seen.insert(index).second)
{
auto orderBook = std::make_shared<OrderBook>(index, book);
sourceMap[book.in].push_back(orderBook);
destMap[book.out].push_back(orderBook);
if (isXRP(book.out))
XRPBooks.insert(book.in);
++books;
}
++cnt;
}
}
}
catch (SHAMapMissingNode const& mn)
{
JLOG(j_.info()) << "OrderBookDB::update: " << mn.what();
std::lock_guard sl(mLock);
mSeq = 0;
JLOG(j_.info()) << "Missing node in " << ledger->seq()
<< " during update: " << mn.what();
seq_.store(0);
return;
}

JLOG(j_.debug()) << "OrderBookDB::update< " << books << " books found";
JLOG(j_.debug()) << "Update completed (" << ledger->seq() << "): " << cnt
<< " books found";

{
std::lock_guard sl(mLock);

mXRPBooks.swap(XRPBooks);
mSourceMap.swap(sourceMap);
mDestMap.swap(destMap);
allBooks_.swap(allBooks);
xrpBooks_.swap(xrpBooks);
}

app_.getLedgerMaster().newOrderBookDB();
}

void
OrderBookDB::addOrderBook(Book const& book)
{
bool toXRP = isXRP(book.out);

std::lock_guard sl(mLock);

if (toXRP)
{
// We don't want to search through all the to-XRP or from-XRP order
// books!
for (auto ob : mSourceMap[book.in])
{
if (isXRP(ob->getCurrencyOut())) // also to XRP
return;
}
}
else
{
for (auto ob : mDestMap[book.out])
{
if (ob->getCurrencyIn() == book.in.currency &&
ob->getIssuerIn() == book.in.account)
{
return;
}
}
}
uint256 index = getBookBase(book);
auto orderBook = std::make_shared<OrderBook>(index, book);
allBooks_[book.in].insert(book.out);

mSourceMap[book.in].push_back(orderBook);
mDestMap[book.out].push_back(orderBook);
if (toXRP)
mXRPBooks.insert(book.in);
xrpBooks_.insert(book.in);
}

// return list of all orderbooks that want this issuerID and currencyID
OrderBook::List
std::vector<Book>
OrderBookDB::getBooksByTakerPays(Issue const& issue)
{
std::lock_guard sl(mLock);
auto it = mSourceMap.find(issue);
return it == mSourceMap.end() ? OrderBook::List() : it->second;
std::vector<Book> ret;

{
std::lock_guard sl(mLock);

if (auto it = allBooks_.find(issue); it != allBooks_.end())
{
ret.reserve(it->second.size());

for (auto const& gets : it->second)
ret.push_back(Book(issue, gets));
}
}

return ret;
}

int
OrderBookDB::getBookSize(Issue const& issue)
{
std::lock_guard sl(mLock);
auto it = mSourceMap.find(issue);
return it == mSourceMap.end() ? 0 : it->second.size();
if (auto it = allBooks_.find(issue); it != allBooks_.end())
return static_cast<int>(it->second.size());
return 0;
}

bool
OrderBookDB::isBookToXRP(Issue const& issue)
{
std::lock_guard sl(mLock);
return mXRPBooks.count(issue) > 0;
return xrpBooks_.count(issue) > 0;
}

BookListeners::pointer
Expand Down Expand Up @@ -247,63 +237,52 @@ OrderBookDB::processTxn(
Json::Value const& jvObj)
{
std::lock_guard sl(mLock);
if (alTx.getResult() == tesSUCCESS)

// For this particular transaction, maintain the set of unique
// subscriptions that have already published it. This prevents sending
// the transaction multiple times if it touches multiple ltOFFER
// entries for the same book, or if it touches multiple books and a
// single client has subscribed to those books.
hash_set<std::uint64_t> havePublished;

// Check if this is an offer or an offer cancel or a payment that
// consumes an offer.
// Check to see what the meta looks like.
for (auto& node : alTx.getMeta()->getNodes())
{
// For this particular transaction, maintain the set of unique
// subscriptions that have already published it. This prevents sending
// the transaction multiple times if it touches multiple ltOFFER
// entries for the same book, or if it touches multiple books and a
// single client has subscribed to those books.
hash_set<std::uint64_t> havePublished;

// Check if this is an offer or an offer cancel or a payment that
// consumes an offer.
// Check to see what the meta looks like.
for (auto& node : alTx.getMeta()->getNodes())
try
{
try
if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
{
if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
{
SField const* field = nullptr;

// We need a field that contains the TakerGets and TakerPays
// parameters.
if (node.getFName() == sfModifiedNode)
field = &sfPreviousFields;
else if (node.getFName() == sfCreatedNode)
field = &sfNewFields;
else if (node.getFName() == sfDeletedNode)
field = &sfFinalFields;

if (field)
auto process = [&, this](SField const& field) {
if (auto data = dynamic_cast<STObject const*>(
node.peekAtPField(field));
data && data->isFieldPresent(sfTakerPays) &&
data->isFieldPresent(sfTakerGets))
{
auto data = dynamic_cast<const STObject*>(
node.peekAtPField(*field));

if (data && data->isFieldPresent(sfTakerPays) &&
data->isFieldPresent(sfTakerGets))
{
// determine the OrderBook
Book b{
data->getFieldAmount(sfTakerGets).issue(),
data->getFieldAmount(sfTakerPays).issue()};

auto listeners = getBookListeners(b);
if (listeners)
{
listeners->publish(jvObj, havePublished);
}
}
auto listeners = getBookListeners(
{data->getFieldAmount(sfTakerGets).issue(),
data->getFieldAmount(sfTakerPays).issue()});
if (listeners)
listeners->publish(jvObj, havePublished);
}
}
}
catch (std::exception const&)
{
JLOG(j_.info())
<< "Fields not found in OrderBookDB::processTxn";
};

// We need a field that contains the TakerGets and TakerPays
// parameters.
if (node.getFName() == sfModifiedNode)
process(sfPreviousFields);
else if (node.getFName() == sfCreatedNode)
process(sfNewFields);
else if (node.getFName() == sfDeletedNode)
process(sfFinalFields);
}
}
catch (std::exception const& ex)
{
JLOG(j_.info())
<< "processTxn: field not found (" << ex.what() << ")";
}
}
}

Expand Down
Loading

0 comments on commit b9903bb

Please sign in to comment.