Skip to content

Commit

Permalink
Merge pull request ipfs#159 from libp2p/feat/query-improvements
Browse files Browse the repository at this point in the history
dht query improvements
  • Loading branch information
whyrusleeping authored Jun 7, 2018
2 parents 53421fc + 6b33279 commit ae12944
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
log.Debugf("closestPeers query run error: %s", err)
}

if res != nil && res.finalSet != nil {
sorted := kb.SortClosestPeers(res.finalSet.Peers(), kb.ConvertKey(key))
if res != nil && res.queriedSet != nil {
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
}
Expand Down
15 changes: 12 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
todoctr "github.com/ipfs/go-todocounter"
process "github.com/jbenet/goprocess"
ctxproc "github.com/jbenet/goprocess/context"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pset "github.com/libp2p/go-libp2p-peer/peerset"
pstore "github.com/libp2p/go-libp2p-peerstore"
Expand All @@ -38,7 +39,8 @@ type dhtQueryResult struct {
closerPeers []*pstore.PeerInfo // *
success bool

finalSet *pset.PeerSet
finalSet *pset.PeerSet
queriedSet *pset.PeerSet
}

// constructs query
Expand Down Expand Up @@ -76,6 +78,7 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
type dhtQueryRunner struct {
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersQueried *pset.PeerSet // peers successfully connected to and queried
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing

Expand All @@ -99,6 +102,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))),
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
peersQueried: pset.New(),
rateLimit: make(chan struct{}, q.concurrency),
proc: proc,
}
Expand Down Expand Up @@ -163,7 +167,8 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
}

return &dhtQueryResult{
finalSet: r.peersSeen,
finalSet: r.peersSeen,
queriedSet: r.peersQueried,
}, err
}

Expand Down Expand Up @@ -236,7 +241,9 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {

// make sure we're connected to the peer.
// FIXME abstract away into the network layer
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
// Note: Failure to connect in this block will cause the function to
// short circuit.
if r.query.dht.host.Network().Connectedness(p) == inet.NotConnected {
log.Debug("not connected. dialing.")

notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Expand Down Expand Up @@ -271,6 +278,8 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// finally, run the query against this peer
res, err := r.query.qfunc(ctx, p)

r.peersQueried.Add(p)

if err != nil {
log.Debugf("ERROR worker for: %v %v", p, err)
r.Lock()
Expand Down

0 comments on commit ae12944

Please sign in to comment.