Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state/snapshot: cleanup generation logic #24479

Merged
merged 1 commit into from
Mar 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 137 additions & 98 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error

// generateRange generates the state segment with particular prefix. Generation can
// either verify the correctness of existing state through rangeproof and skip
// either verify the correctness of existing state through range-proof and skip
// generation, or iterate trie to regenerate state on demand.
func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
// Use range prover to check the validity of the flat state in the range
Expand Down Expand Up @@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
return !trieMore && !result.diskMore, last, nil
}

// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var (
accMarker []byte
accountRange = accountCheckRange
)
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
// Always reset the initial account range as 1
// whenever recover from the interruption.
accMarker, accountRange = dl.genMarker[:common.HashLength], 1
// checkAndFlush checks if an interruption signal is received or the
// batch size has exceeded the allowance.
func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
var abort chan *generatorStats
select {
case abort = <-dl.genAbort:
default:
}
var (
batch = dl.diskdb.NewBatch()
logged = time.Now()
accOrigin = common.CopyBytes(accMarker)
abort chan *generatorStats
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(current, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
}
// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
journalProgress(batch, current, stats)

checkAndFlush := func(currentLocation []byte) error {
select {
case abort = <-dl.genAbort:
default:
if err := batch.Write(); err != nil {
return err
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(currentLocation, dl.genMarker) < 0 {
log.Error("Snapshot generator went backwards",
"currentLocation", fmt.Sprintf("%x", currentLocation),
"genMarker", fmt.Sprintf("%x", dl.genMarker))
}
batch.Reset()

// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
journalProgress(batch, currentLocation, stats)
dl.lock.Lock()
dl.genMarker = current
dl.lock.Unlock()

if err := batch.Write(); err != nil {
return err
}
batch.Reset()
if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, current)
return newAbortErr(abort) // bubble up an error for interruption
}
}
if time.Since(*logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, current)
*logged = time.Now()
}
return nil
}

dl.lock.Lock()
dl.genMarker = currentLocation
dl.lock.Unlock()
// generateStorages generates the missing storage slots of the specific contract.
// It's supposed to restart the generation from the given origin position.
func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())

if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, currentLocation)
return errors.New("aborted")
}
if delete {
rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key))
snapWipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
if time.Since(logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, currentLocation)
logged = time.Now()
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++

// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil {
return err
}
return nil
}
// Loop for re-generating the missing storage slots.
var origin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire contract storage is generated
if exhausted {
break
}
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
}
return nil
}

// generateAccounts generates the missing snapshot accounts as well as their
// storage slots in the main trie. It's supposed to restart the generation
// from the given origin position.
func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error {
onAccount := func(key []byte, val []byte, write bool, delete bool) error {
var (
start = time.Now()
Expand Down Expand Up @@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
marker = dl.genMarker[:]
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(marker); err != nil {
if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil {
return err
}
// If the iterated account is the contract, create a further loop to
Expand All @@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:]
}
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())

if delete {
rawdb.DeleteStorageSnapshot(batch, accountHash, common.BytesToHash(key))
snapWipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++

// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(append(accountHash[:], key...)); err != nil {
return err
}
return nil
}
var storeOrigin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.generateRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err
}
if exhausted {
break
}
if storeOrigin = increaseKey(last); storeOrigin == nil {
break // special case, the last is 0xffffffff...fff
}
if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil {
return err
}
}
// Some account processed, unmark the marker
accMarker = nil
return nil
}

// Global loop for regerating the entire state trie + all layered storage tries.
// Always reset the initial account range as 1 whenever recover from the interruption.
var accountRange = accountCheckRange
if len(accMarker) > 0 {
accountRange = 1
}
// Global loop for re-generating the account snapshots + all layered storage snapshots.
origin := common.CopyBytes(accMarker)
for {
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP)
// The procedure it aborted, either by external signal or internal error
exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP)
if err != nil {
if abort == nil { // aborted by internal error, wait the signal
abort = <-dl.genAbort
}
abort <- stats
return
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire snapshot is generated
if exhausted {
break
}
if accOrigin = increaseKey(last); accOrigin == nil {
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
accountRange = accountCheckRange
}
return nil
}

// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
var accMarker []byte
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]
}
var (
batch = dl.diskdb.NewBatch()
logged = time.Now()
abort chan *generatorStats
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)

// Generate the snapshot accounts from the point where they left off.
if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil {
// Extract the received interruption signal if exists
if aerr, ok := err.(*abortErr); ok {
abort = aerr.abort
}
// Aborted by internal error, wait the signal
if abort == nil {
abort = <-dl.genAbort
}
abort <- stats
return
}
// Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete.
Expand Down Expand Up @@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
}

// increaseKey increase the input key by one bit. Return nil if the entire
// addition operation overflows,
// addition operation overflows.
func increaseKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
Expand All @@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte {
}
return nil
}

// abortErr wraps an interruption signal received to represent the
// generation is aborted by external processes.
type abortErr struct {
abort chan *generatorStats
}

func newAbortErr(abort chan *generatorStats) error {
return &abortErr{abort: abort}
}

func (err *abortErr) Error() string {
return "aborted"
}