Skip to content

Commit

Permalink
core: implement background trie prefetcher
Browse files Browse the repository at this point in the history
Squashed from the following commits:

core/state: lazily init snapshot storage map
core/state: fix flawed meter on storage reads
core/state: make statedb/stateobjects reuse a hasher
core/blockchain, core/state: implement new trie prefetcher
core: make trie prefetcher deliver tries to statedb
core/state: refactor trie_prefetcher, export storage tries
blockchain: re-enable the next-block-prefetcher
state: remove panics in trie prefetcher
core/state/trie_prefetcher: address some review concerns

sq
  • Loading branch information
holiman authored and karalabe committed Jan 20, 2021
1 parent 93a89b2 commit 1e1865b
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 34 deletions.
27 changes: 20 additions & 7 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,12 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
triePrefetcher *state.TriePrefetcher // Trie prefetcher interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -249,6 +250,15 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
tp := state.NewTriePrefetcher(bc.stateCache)

bc.wg.Add(1)
go func() {
tp.Loop()
bc.wg.Done()
}()
bc.triePrefetcher = tp

bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
Expand Down Expand Up @@ -991,6 +1001,9 @@ func (bc *BlockChain) Stop() {
bc.scope.Close()
close(bc.quit)
bc.StopInsert()
if bc.triePrefetcher != nil {
bc.triePrefetcher.Close()
}
bc.wg.Wait()

// Ensure that the entirety of the state snapshot is journalled to disk.
Expand Down Expand Up @@ -1857,6 +1870,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb.UsePrefetcher(bc.triePrefetcher)
if err != nil {
return it.index, err
}
Expand Down Expand Up @@ -1891,8 +1905,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them

triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates

Expand Down
12 changes: 10 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,20 @@ type cachingDB struct {

// OpenTrie opens the main account trie at a specific root hash.
func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// OpenStorageTrie opens the storage trie of an account.
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// CopyTrie returns an independent copy of the given trie.
Expand Down
72 changes: 52 additions & 20 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,20 @@ func (s *stateObject) touch() {

func (s *stateObject) getTrie(db Database) Trie {
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
// Try fetching from prefetcher first
// We don't prefetch empty tries
if s.data.Root != emptyRoot && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no
// prefetcher
s.trie = s.db.prefetcher.GetTrie(s.data.Root)
}
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
}
}
}
return s.trie
Expand Down Expand Up @@ -197,12 +206,24 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If no live objects are available, attempt to use snapshots
var (
enc []byte
err error
enc []byte
err error
meter *time.Duration
)
readStart := time.Now()
if metrics.EnabledExpensive {
// If the snap is 'under construction', the first lookup may fail. If that
// happens, we don't want to double-count the time elapsed. Thus this
// dance with the metering.
defer func() {
if meter != nil {
*meter += time.Since(readStart)
}
}()
}
if s.db.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now())
meter = &s.db.SnapshotStorageReads
}
// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
Expand All @@ -217,8 +238,14 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If snapshot unavailable or reading from it failed, load from the database
if s.db.snap == nil || err != nil {
if meter != nil {
// If we already spent time checking the snapshot, account for it
// and reset the readStart
*meter += time.Since(readStart)
readStart = time.Now()
}
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now())
meter = &s.db.StorageReads
}
if enc, err = s.getTrie(db).TryGet(key.Bytes()); err != nil {
s.setError(err)
Expand Down Expand Up @@ -283,8 +310,13 @@ func (s *stateObject) setState(key, value common.Hash) {
// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise() {
trieChanges := make([]common.Hash, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
trieChanges = append(trieChanges, key)
}
if len(trieChanges) > 0 && s.db.prefetcher != nil && s.data.Root != emptyRoot {
s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges)
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -303,18 +335,11 @@ func (s *stateObject) updateTrie(db Database) Trie {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
}
// Retrieve the snapshot storage map for the object
// The snapshot storage map for the object
var storage map[common.Hash][]byte
if s.db.snap != nil {
// Retrieve the old storage map, if available, create a new one otherwise
storage = s.db.snapStorage[s.addrHash]
if storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
Expand All @@ -331,8 +356,15 @@ func (s *stateObject) updateTrie(db Database) Trie {
s.setError(tr.TryUpdate(key[:], v))
}
// If state snapshotting is active, cache the data til commit
if storage != nil {
storage[crypto.Keccak256Hash(key[:])] = v // v will be nil if value is 0x00
if s.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = s.db.snapStorage[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00
}
}
if len(s.pendingStorage) > 0 {
Expand Down
45 changes: 42 additions & 3 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ func (n *proofList) Delete(key []byte) error {
// * Contracts
// * Accounts
type StateDB struct {
db Database
trie Trie
db Database
prefetcher *TriePrefetcher
originalRoot common.Hash // The pre-state root, before any changes were made
trie Trie
hasher crypto.KeccakState

snaps *snapshot.Tree
snap snapshot.Snapshot
Expand Down Expand Up @@ -125,6 +128,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
sdb := &StateDB{
db: db,
trie: tr,
originalRoot: root,
snaps: snaps,
stateObjects: make(map[common.Address]*stateObject),
stateObjectsPending: make(map[common.Address]struct{}),
Expand All @@ -133,6 +137,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
accessList: newAccessList(),
hasher: crypto.NewKeccakState(),
}
if sdb.snaps != nil {
if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil {
Expand All @@ -144,6 +149,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return sdb, nil
}

func (s *StateDB) UsePrefetcher(prefetcher *TriePrefetcher) {
if prefetcher != nil {
s.prefetcher = prefetcher
s.prefetcher.Resume(s.originalRoot)
}
}

// setError remembers the first non-nil error it is called with.
func (s *StateDB) setError(err error) {
if s.dbErr == nil {
Expand Down Expand Up @@ -532,7 +544,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now())
}
var acc *snapshot.Account
if acc, err = s.snap.Account(crypto.Keccak256Hash(addr.Bytes())); err == nil {
if acc, err = s.snap.Account(crypto.HashData(s.hasher, addr.Bytes())); err == nil {
if acc == nil {
return nil
}
Expand Down Expand Up @@ -675,6 +687,7 @@ func (s *StateDB) Copy() *StateDB {
logSize: s.logSize,
preimages: make(map[common.Hash][]byte, len(s.preimages)),
journal: newJournal(),
hasher: crypto.NewKeccakState(),
}
// Copy the dirty states, logs, and preimages
for addr := range s.journal.dirties {
Expand Down Expand Up @@ -760,6 +773,7 @@ func (s *StateDB) GetRefund() uint64 {
// the journal as well as the refunds. Finalise, however, will not push any updates
// into the tries just yet. Only IntermediateRoot or Commit will do that.
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
var addressesToPrefetch []common.Address
for addr := range s.journal.dirties {
obj, exist := s.stateObjects[addr]
if !exist {
Expand Down Expand Up @@ -788,7 +802,17 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
}
s.stateObjectsPending[addr] = struct{}{}
s.stateObjectsDirty[addr] = struct{}{}
// At this point, also ship the address off to the precacher. The precacher
// will start loading tries, and when the change is eventually committed,
// the commit-phase will be a lot faster
if s.prefetcher != nil {
addressesToPrefetch = append(addressesToPrefetch, addr)
}
}
if s.prefetcher != nil {
s.prefetcher.PrefetchAddresses(addressesToPrefetch)
}

// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
}
Expand All @@ -800,6 +824,21 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)

// Now we're about to start to write changes to the trie. The trie is so
// far _untouched_. We can check with the prefetcher, if it can give us
// a trie which has the same root, but also has some content loaded into it.
// If so, use that one instead.
if s.prefetcher != nil {
s.prefetcher.Pause()
// We only want to do this _once_, if someone calls IntermediateRoot again,
// we shouldn't fetch the trie again
if s.originalRoot != (common.Hash{}) {
if trie := s.prefetcher.GetTrie(s.originalRoot); trie != nil {
s.trie = trie
}
s.originalRoot = common.Hash{}
}
}
for addr := range s.stateObjectsPending {
obj := s.stateObjects[addr]
if obj.deleted {
Expand Down
Loading

0 comments on commit 1e1865b

Please sign in to comment.