Skip to content

Commit

Permalink
Negative cache support for node store
Browse files Browse the repository at this point in the history
Adds support to TaggedCache to support smart replacement
(Needed to avoid race conditions with negative caching.)

Create a "hotDUMMY" object that represents the absence
of an object.

Allow DatabaseNodeImp::asyncFetch to complete immediately
if object is in cache (positive or negative).

Fix a bug in asyncFetch where the object returned may not
be the correct canonical version because we stash the
object in the results array before we canonicalize it.
  • Loading branch information
JoelKatz authored and manojsdoshi committed Mar 30, 2022
1 parent b9903bb commit 3eb8aa8
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 20 deletions.
24 changes: 12 additions & 12 deletions src/ripple/basics/TaggedCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,16 @@ class TaggedCache
@param key The key corresponding to the object
@param data A shared pointer to the data corresponding to the object.
@param replace `true` if `data` is the up to date version of the object.
@param replace Function that decides if cache should be replaced
@return `true` If the key already existed.
*/
private:
template <bool replace>
public:
bool
canonicalize(
const key_type& key,
std::conditional_t<
replace,
std::shared_ptr<T> const,
std::shared_ptr<T>>& data)
std::shared_ptr<T>& data,
std::function<bool(std::shared_ptr<T> const&)>&& replace)
{
// Return canonical value, store if needed, refresh in cache
// Return values: true=we had the data already
Expand All @@ -335,7 +332,7 @@ class TaggedCache

if (entry.isCached())
{
if constexpr (replace)
if (replace(entry.ptr))
{
entry.ptr = data;
entry.weak_ptr = data;
Expand All @@ -352,7 +349,7 @@ class TaggedCache

if (cachedData)
{
if constexpr (replace)
if (replace(entry.ptr))
{
entry.ptr = data;
entry.weak_ptr = data;
Expand All @@ -374,19 +371,22 @@ class TaggedCache
return false;
}

public:
bool
canonicalize_replace_cache(
const key_type& key,
std::shared_ptr<T> const& data)
{
return canonicalize<true>(key, data);
return canonicalize(
key,
const_cast<std::shared_ptr<T>&>(data),
[](std::shared_ptr<T> const&) { return true; });
}

bool
canonicalize_replace_client(const key_type& key, std::shared_ptr<T>& data)
{
return canonicalize<false>(key, data);
return canonicalize(
key, data, [](std::shared_ptr<T> const&) { return false; });
}

std::shared_ptr<T>
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/nodestore/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class Database
object is stored, used by the shard store.
@param callback Callback function when read completes
*/
void
virtual void
asyncFetch(
uint256 const& hash,
std::uint32_t ledgerSeq,
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/nodestore/NodeObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ enum NodeObjectType : std::uint32_t {
hotUNKNOWN = 0,
hotLEDGER = 1,
hotACCOUNT_NODE = 3,
hotTRANSACTION_NODE = 4
hotTRANSACTION_NODE = 4,
hotDUMMY = 512 // an invalid or missing object
};

/** A simple object that the Ledger uses to store entries.
Expand Down
59 changes: 53 additions & 6 deletions src/ripple/nodestore/impl/DatabaseNodeImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,34 @@ DatabaseNodeImp::store(
{
storeStats(1, data.size());

backend_->store(NodeObject::createObject(type, std::move(data), hash));
auto obj = NodeObject::createObject(type, std::move(data), hash);
backend_->store(obj);
if (cache_)
{
// After the store, replace a negative cache entry if there is one
cache_->canonicalize(
hash, obj, [](std::shared_ptr<NodeObject> const& n) {
return n->getType() == hotDUMMY;
});
}
}

void
DatabaseNodeImp::asyncFetch(
uint256 const& hash,
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
{
if (cache_)
{
std::shared_ptr<NodeObject> obj = cache_->fetch(hash);
if (obj)
{
callback(obj->getType() == hotDUMMY ? nullptr : obj);
return;
}
}
Database::asyncFetch(hash, ledgerSeq, std::move(callback));
}

void
Expand Down Expand Up @@ -75,8 +102,19 @@ DatabaseNodeImp::fetchNodeObject(
switch (status)
{
case ok:
if (nodeObject && cache_)
cache_->canonicalize_replace_client(hash, nodeObject);
if (cache_)
{
if (nodeObject)
cache_->canonicalize_replace_client(hash, nodeObject);
else
{
auto notFound =
NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
if (notFound->getType() != hotDUMMY)
nodeObject = notFound;
}
}
break;
case notFound:
break;
Expand All @@ -95,6 +133,8 @@ DatabaseNodeImp::fetchNodeObject(
{
JLOG(j_.trace()) << "fetchNodeObject " << hash
<< ": record found in cache";
if (nodeObject->getType() == hotDUMMY)
nodeObject.reset();
}

if (nodeObject)
Expand Down Expand Up @@ -127,7 +167,7 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
}
else
{
results[i] = nObj;
results[i] = nObj->getType() == hotDUMMY ? nullptr : nObj;
// It was in the cache.
++hits;
}
Expand All @@ -140,9 +180,8 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)

for (size_t i = 0; i < dbResults.size(); ++i)
{
auto nObj = dbResults[i];
auto nObj = std::move(dbResults[i]);
size_t index = indexMap[cacheMisses[i]];
results[index] = nObj;
auto const& hash = hashes[index];

if (nObj)
Expand All @@ -156,7 +195,15 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
JLOG(j_.error())
<< "fetchBatch - "
<< "record not found in db or cache. hash = " << strHex(hash);
if (cache_)
{
auto notFound = NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
if (notFound->getType() != hotDUMMY)
nObj = std::move(notFound);
}
}
results[index] = std::move(nObj);
}

auto fetchDurationUs =
Expand Down
8 changes: 8 additions & 0 deletions src/ripple/nodestore/impl/DatabaseNodeImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class DatabaseNodeImp : public Database
// only one database
return true;
}

void
sync() override
{
Expand All @@ -120,6 +121,13 @@ class DatabaseNodeImp : public Database
std::vector<std::shared_ptr<NodeObject>>
fetchBatch(std::vector<uint256> const& hashes);

void
asyncFetch(
uint256 const& hash,
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
override;

bool
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
{
Expand Down

0 comments on commit 3eb8aa8

Please sign in to comment.