Skip to content

Commit

Permalink
fix: don't add unresponsive DHT servers to the Routing Table (#820)
Browse files Browse the repository at this point in the history
* added check to avoid adding unresponsive dht peers to the dht routing table

* removed lock in adding peers to the rt

* made variable names more meaningful

* fixed network loop and corrected tests

* added UsefulPeer() references from current PR

* go mod tidy

* added delay in TestRefreshBelowMinRTThreshold

* addressed review

* go mod tidy

* addressed Jorropo review

* added comments

* removed state of peers probed recently

* fix conflicts merge

* updated deps

* added optimizations documentation

* Update dht.go

* updated md files

---------

Co-authored-by: Jorropo <[email protected]>
  • Loading branch information
guillaumemichel and Jorropo authored Jun 12, 2023
1 parent 8d07d57 commit 8c9fdff
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 931 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

- [Install](#install)
- [Usage](#usage)
- [Optimizations](#optimizations)
- [Contribute](#contribute)
- [Maintainers](#maintainers)
- [License](#license)
Expand All @@ -21,6 +22,10 @@
go get github.com/libp2p/go-libp2p-kad-dht
```

## Optimizations

Client-side optimizations are described in [optimizations.md](./optimizations.md)

## Usage

Go to https://godoc.org/github.com/libp2p/go-libp2p-kad-dht.
Expand Down
110 changes: 62 additions & 48 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -128,6 +123,9 @@ type IpfsDHT struct {

autoRefresh bool

// timeout for the lookupCheck operation
lookupCheckTimeout time.Duration

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
Expand All @@ -143,7 +141,7 @@ type IpfsDHT struct {
disableFixLowPeers bool
fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
addPeerToRTChan chan peer.ID
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
Expand Down Expand Up @@ -247,7 +245,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(p, false)
dht.peerFound(dht.ctx, p)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -309,7 +307,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
addPeerToRTChan: make(chan peer.ID),
refreshFinishedCh: make(chan struct{}),

enableOptProv: cfg.EnableOptimisticProvide,
Expand Down Expand Up @@ -339,6 +337,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout

// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

Expand Down Expand Up @@ -377,6 +377,18 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
return dht, nil
}

// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least its own peerid
if err == nil && len(peerids) == 0 {
return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids))
}
return err
}

func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
Expand All @@ -388,16 +400,11 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
_, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
dht.lookupCheck,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down Expand Up @@ -505,7 +512,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(p, false)
dht.peerFound(ctx, p)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -616,22 +623,22 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
case p := <-dht.addPeerToRTChan:
if dht.routingTable.Size() == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
Expand All @@ -651,40 +658,47 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
//
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=0
//
// If we connect to a peer and then exchange a query RPC ->
//
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
//
// If we query a peer we already have in our Routing Table ->
//
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
//
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) {

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
// peerFound verifies whether the found peer advertises DHT protocols
// and probe it to make sure it answers DHT queries as expected. If
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
// if the peer is already in the routing table or the appropriate bucket is
// already full, don't try to add the new peer.ID
if dht.routingTable.Find(p) != "" || !dht.routingTable.UsefulPeer(p) {
return
}

// verify whether the remote peer advertises the right dht protocol
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():

livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {
logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
return
}

// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
}
}

// validPeerFound signals the routingTable that we've found a peer that
// supports the DHT protocol, and just answered correctly to a DHT FindPeers
func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}

select {
case dht.addPeerToRTChan <- p:
case <-dht.ctx.Done():
return
}
}

Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(d5.self, true)
d.peerFound(d1.self, true)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
11 changes: 9 additions & 2 deletions dht_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"context"
"net"
"sync/atomic"
"testing"

ic "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -31,12 +32,17 @@ func TestIsRelay(t *testing.T) {
type mockConn struct {
local peer.AddrInfo
remote peer.AddrInfo

isClosed atomic.Bool
}

var _ network.Conn = (*mockConn)(nil)

func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error { return nil }
func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error {
m.isClosed.Store(true)
return nil
}
func (m *mockConn) NewStream(context.Context) (network.Stream, error) { return nil, nil }
func (m *mockConn) GetStreams() []network.Stream { return []network.Stream{} }
func (m *mockConn) Stat() network.ConnStats {
Expand All @@ -50,6 +56,7 @@ func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }
func (m *mockConn) ConnState() network.ConnectionState { return network.ConnectionState{} }
func (m *mockConn) IsClosed() bool { return m.isClosed.Load() }

func TestFilterCaching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
6 changes: 4 additions & 2 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT
dht.peerFound(mPeer, true)
// a peer has queried us, let's add it to RT. A new go routine is required
// because we can't block the stream handler until the remote peer answers
// our query.
go dht.peerFound(dht.ctx, mPeer)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
48 changes: 47 additions & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,8 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
connect(t, ctx, dhtA, dhtD)

// and because of the above bootstrap, A also discovers E !
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 10*time.Second)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

Expand Down Expand Up @@ -1327,6 +1328,49 @@ func TestClientModeConnect(t *testing.T) {
}
}

func TestInvalidServer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)

// make b advertise all dht server protocols
for _, proto := range a.serverProtocols {
// Hang on every request.
b.host.SetStreamHandler(proto, func(s network.Stream) {
defer s.Reset() // nolint
<-ctx.Done()
})
}

connectNoSync(t, ctx, a, b)

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p})
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
if err != nil {
t.Fatal(err)
}

if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}

if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
if a.routingTable.Find(b.self) != "" {
t.Fatal("DHT clients should not be added to routing tables")
}
if b.routingTable.Find(a.self) == "" {
t.Fatal("DHT server should have been added to the dht client's routing table")
}
}

func TestClientModeFindPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -2126,6 +2170,8 @@ func TestPreconnectedNodes(t *testing.T) {
require.NoError(t, err)
defer h2.Close()

connect(t, ctx, d1, d2)

// See if it works
peers, err := d2.GetClosestPeers(ctx, "testkey")
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 8c9fdff

Please sign in to comment.