Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/discover: avoid dropping unverified nodes when table is almost empty #21396

Merged
merged 9 commits into from
Aug 24, 2020
13 changes: 7 additions & 6 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ func (it *lookup) startQueries() bool {

// The first query returns nodes from the local table.
if it.queries == -1 {
it.tab.mutex.Lock()
closest := it.tab.closest(it.result.target, bucketSize, false)
it.tab.mutex.Unlock()
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
// for the table to fill in this case, but there is no good mechanism for that
// yet.
Expand Down Expand Up @@ -150,11 +148,14 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
} else if len(r) == 0 {
fails++
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
it.tab.log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "results", len(r), "err", err)
if fails >= maxFindnodeFailures {
it.tab.log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket.
dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
dropped = true
it.tab.delete(n)
}
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
} else if fails > 0 {
// Reset failure counter because it counts _consecutive_ failures.
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
Expand Down
43 changes: 32 additions & 11 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,22 +392,35 @@ func (tab *Table) copyLiveNodes() {
}
}

// closest returns the n nodes in the table that are closest to the
// given id. The caller must hold tab.mutex.
func (tab *Table) closest(target enode.ID, nresults int, checklive bool) *nodesByDistance {
// This is a very wasteful way to find the closest nodes but
// obviously correct. I believe that tree-based buckets would make
// this easier to implement efficiently.
close := &nodesByDistance{target: target}
// findnodeByID returns the n nodes in the table that are closest to the given id.
// This is used by the FINDNODE/v4 handler.
//
// The preferLive parameter says whether the caller wants liveness-checked results. If
// preferLive is true and the table contains any verified nodes, the result will not
// contain unverified nodes. However, if there are no verified nodes at all, the result
// will contain unverified nodes.
func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance {
tab.mutex.Lock()
defer tab.mutex.Unlock()

// Scan all buckets. There might be a better way to do this, but there aren't that many
// buckets, so this solution should be fine. The worst-case complexity of this loop
// is O(tab.len() * nresults).
nodes := &nodesByDistance{target: target}
liveNodes := &nodesByDistance{target: target}
for _, b := range &tab.buckets {
for _, n := range b.entries {
if checklive && n.livenessChecks == 0 {
continue
nodes.push(n, nresults)
if preferLive && n.livenessChecks > 0 {
liveNodes.push(n, nresults)
}
close.push(n, nresults)
}
}
return close

if preferLive && len(liveNodes.entries) > 0 {
return liveNodes
}
return nodes
}

// len returns the number of nodes in the table.
Expand All @@ -421,6 +434,14 @@ func (tab *Table) len() (n int) {
return n
}

// bucketLen returns the number of nodes in the bucket for the given ID.
func (tab *Table) bucketLen(id enode.ID) int {
tab.mutex.Lock()
defer tab.mutex.Unlock()

return len(tab.bucket(id).entries)
}

// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(id enode.ID) *bucket {
d := enode.LogDist(tab.self().ID(), id)
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func checkIPLimitInvariant(t *testing.T, tab *Table) {
}
}

func TestTable_closest(t *testing.T) {
func TestTable_findnodeByID(t *testing.T) {
t.Parallel()

test := func(test *closeTest) bool {
Expand All @@ -202,7 +202,7 @@ func TestTable_closest(t *testing.T) {
fillTable(tab, test.All)

// check that closest(Target, N) returns nodes
result := tab.closest(test.Target, test.N, false).entries
result := tab.findnodeByID(test.Target, test.N, false).entries
if hasDuplicates(result) {
t.Errorf("result contains duplicates")
return false
Expand Down
17 changes: 12 additions & 5 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,16 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
return nodes, <-rm.errc
// Ensure that callers don't see a timeout if the node actually responded. Since
// findnode can receive more than one neighbors response, the reply matcher will be
// active until the remote node sends enough nodes. If the remote end doesn't have
// enough nodes the reply matcher will time out waiting for the second reply, but
// there's no need for an error in that case.
err := <-rm.errc
if err == errTimeout && rm.reply != nil {
err = nil
}
return nodes, err
}

// RequestENR sends enrRequest to the given node and waits for a response.
Expand Down Expand Up @@ -453,9 +462,9 @@ func (t *UDPv4) loop() {
if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
ok, requestDone := p.callback(r.data)
matched = matched || ok
p.reply = r.data
// Remove the matcher if callback indicates that all replies have been received.
if requestDone {
p.reply = r.data
p.errc <- nil
plist.Remove(el)
}
Expand Down Expand Up @@ -715,9 +724,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno

// Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
t.tab.mutex.Lock()
closest := t.tab.closest(target, bucketSize, true).entries
t.tab.mutex.Unlock()
closest := t.tab.findnodeByID(target, bucketSize, true).entries

// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the packet size limit.
Expand Down
88 changes: 87 additions & 1 deletion p2p/discover/v4_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
crand "crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -277,7 +278,7 @@ func TestUDPv4_findnode(t *testing.T) {
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())

// check that closest neighbors are returned.
expected := test.table.closest(testTarget.ID(), bucketSize, true)
expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
waitNeighbors := func(want []*node) {
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
Expand Down Expand Up @@ -493,6 +494,91 @@ func TestUDPv4_EIP868(t *testing.T) {
})
}

// This test verifies that a small network of nodes can boot up into a healthy state.
func TestUDPv4_smallNetConvergence(t *testing.T) {
t.Parallel()

// Start the network.
nodes := make([]*UDPv4, 4)
for i := range nodes {
var cfg Config
if i > 0 {
bn := nodes[0].Self()
cfg.Bootnodes = []*enode.Node{bn}
}
nodes[i] = startLocalhostV4(t, cfg)
defer nodes[i].Close()
}

// Run through the iterator on all nodes until
// they have all found each other.
status := make(chan error, len(nodes))
for i := range nodes {
node := nodes[i]
go func() {
found := make(map[enode.ID]bool, len(nodes))
it := node.RandomNodes()
for it.Next() {
found[it.Node().ID()] = true
if len(found) == len(nodes) {
status <- nil
return
}
}
status <- fmt.Errorf("node %s didn't find all nodes", node.Self().ID().TerminalString())
}()
}

// Wait for all status reports.
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
for received := 0; received < len(nodes); {
select {
case <-timeout.C:
for _, node := range nodes {
node.Close()
}
case err := <-status:
received++
if err != nil {
t.Error("ERROR:", err)
return
}
}
}
}

func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
t.Helper()

cfg.PrivateKey = newkey()
db, _ := enode.OpenDB("")
ln := enode.NewLocalNode(db, cfg.PrivateKey)

// Prefix logs with node ID.
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
lfmt := log.TerminalFormat(false)
cfg.Log = testlog.Logger(t, log.LvlTrace)
cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error {
t.Logf("%s %s", lprefix, lfmt.Format(r))
return nil
}))

// Listen.
socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
if err != nil {
t.Fatal(err)
}
realaddr := socket.LocalAddr().(*net.UDPAddr)
ln.SetStaticIP(realaddr.IP)
ln.SetFallbackUDP(realaddr.Port)
udp, err := ListenV4(socket, ln, cfg)
if err != nil {
t.Fatal(err)
}
return udp
}

// dgramPipe is a fake UDP socket. It queues all sent datagrams.
type dgramPipe struct {
mu *sync.Mutex
Expand Down