Skip to content

Commit

Permalink
statesync: improve e2e test outcomes (backport #6378) (#6380)
Browse files Browse the repository at this point in the history
(cherry picked from commit d36a5905a67db1ed7afb09f371b3ea3910afb6eb)

Co-authored-by: Sam Kleinman <[email protected]>
  • Loading branch information
tnasu and tychoish committed Jan 12, 2022
1 parent fb1b65d commit dd96011
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 21 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,10 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
return errors.New("found empty rpc_servers entry")
}
}
if cfg.DiscoveryTime != 0 && cfg.DiscoveryTime < 5*time.Second {
return errors.New("discovery time must be 0s or greater than five seconds")
}

if cfg.TrustPeriod <= 0 {
return errors.New("trusted_period is required")
}
Expand Down
13 changes: 9 additions & 4 deletions statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,16 @@ func (r *Reactor) Sync(
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.mtx.Unlock()

// Request snapshots from all currently connected peers
r.Logger.Debug("Requesting snapshots from known peers")
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
hook := func() {
r.Logger.Debug("Requesting snapshots from known peers")
// Request snapshots from all currently connected peers
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
}

hook()

state, previousState, commit, err := r.syncer.SyncAny(discoveryTime, hook)

state, previousState, commit, err := r.syncer.SyncAny(discoveryTime)
r.mtx.Lock()
r.syncer = nil
r.mtx.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions statesync/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func (p *snapshotPool) Ranked() []*snapshot {
defer p.Unlock()

candidates := make([]*snapshot, 0, len(p.snapshots))
for _, snapshot := range p.snapshots {
candidates = append(candidates, snapshot)
for key := range p.snapshots {
candidates = append(candidates, p.snapshots[key])
}

sort.Slice(candidates, func(i, j int) bool {
Expand Down
10 changes: 9 additions & 1 deletion statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
chunkTimeout = 2 * time.Minute
// requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
chunkRequestTimeout = 10 * time.Second
// minimumDiscoveryTime is the lowest allowable time for a
// SyncAny discovery time.
minimumDiscoveryTime = 5 * time.Second
)

var (
Expand Down Expand Up @@ -125,7 +128,11 @@ func (s *syncer) RemovePeer(peer p2p.Peer) {
// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
// snapshots if none were found and discoveryTime > 0. It returns the latest state, previous state and block commit
// which the caller must use to bootstrap the node.
func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, sm.State, *types.Commit, error) {
func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, sm.State, *types.Commit, error) {
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
discoveryTime = 5 * minimumDiscoveryTime
}

if discoveryTime > 0 {
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
Expand All @@ -148,6 +155,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, sm.State, *type
if discoveryTime == 0 {
return sm.State{}, sm.State{}, nil, errNoSnapshots
}
retryHook()
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
continue
Expand Down
14 changes: 7 additions & 7 deletions statesync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestSyncer_SyncAny(t *testing.T) {
LastBlockAppHash: []byte("app_hash"),
}, nil)

newState, previousState, lastCommit, err := syncer.SyncAny(0)
newState, previousState, lastCommit, err := syncer.SyncAny(0, func() {})
require.NoError(t, err)

time.Sleep(50 * time.Millisecond) // wait for peers to receive requests
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestSyncer_SyncAny(t *testing.T) {

func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
syncer, _ := setupOfferSyncer(t)
_, _, _, err := syncer.SyncAny(0)
_, _, _, err := syncer.SyncAny(0, func() {})
assert.Equal(t, errNoSnapshots, err)
}

Expand All @@ -230,7 +230,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)

_, _, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0, func() {})
assert.Equal(t, errAbort, err)
connSnapshot.AssertExpectations(t)
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)

_, _, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0, func() {})
assert.Equal(t, errNoSnapshots, err)
connSnapshot.AssertExpectations(t)
}
Expand All @@ -288,7 +288,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)

_, _, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0, func() {})
assert.Equal(t, errAbort, err)
connSnapshot.AssertExpectations(t)
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)

_, _, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0, func() {})
assert.Equal(t, errNoSnapshots, err)
connSnapshot.AssertExpectations(t)
}
Expand All @@ -342,7 +342,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(nil, errBoom)

_, _, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0, func() {})
assert.True(t, errors.Is(err, errBoom))
connSnapshot.AssertExpectations(t)
}
Expand Down
11 changes: 4 additions & 7 deletions test/e2e/app/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package main

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -88,11 +87,10 @@ func (s *SnapshotStore) Create(state *State) (abci.Snapshot, error) {
if err != nil {
return abci.Snapshot{}, err
}
hash := sha256.Sum256(bz)
snapshot := abci.Snapshot{
Height: state.Height,
Format: 1,
Hash: hash[:],
Hash: hashItems(state.Values),
Chunks: byteChunks(bz),
}
err = ioutil.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", state.Height)), bz, 0644)
Expand All @@ -111,10 +109,9 @@ func (s *SnapshotStore) Create(state *State) (abci.Snapshot, error) {
func (s *SnapshotStore) List() ([]*abci.Snapshot, error) {
s.RLock()
defer s.RUnlock()
snapshots := []*abci.Snapshot{}
for _, snapshot := range s.metadata {
s := snapshot // copy to avoid pointer to range variable
snapshots = append(snapshots, &s)
snapshots := make([]*abci.Snapshot, len(s.metadata))
for idx := range s.metadata {
snapshots[idx] = &s.metadata[idx]
}
return snapshots, nil
}
Expand Down

0 comments on commit dd96011

Please sign in to comment.