Skip to content

Commit

Permalink
make getclosestpeers only return peers we can actually connect to
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Jun 7, 2018
1 parent 1290c4e commit 6b33279
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
log.Debugf("closestPeers query run error: %s", err)
}

if res != nil && res.finalSet != nil {
sorted := kb.SortClosestPeers(res.finalSet.Peers(), kb.ConvertKey(key))
if res != nil && res.queriedSet != nil {
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
}
Expand Down
10 changes: 8 additions & 2 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type dhtQueryResult struct {
closerPeers []*pstore.PeerInfo // *
success bool

finalSet *pset.PeerSet
finalSet *pset.PeerSet
queriedSet *pset.PeerSet
}

// constructs query
Expand Down Expand Up @@ -77,6 +78,7 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
type dhtQueryRunner struct {
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersQueried *pset.PeerSet // peers successfully connected to and queried
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing

Expand All @@ -100,6 +102,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))),
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
peersQueried: pset.New(),
rateLimit: make(chan struct{}, q.concurrency),
proc: proc,
}
Expand Down Expand Up @@ -164,7 +167,8 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
}

return &dhtQueryResult{
finalSet: r.peersSeen,
finalSet: r.peersSeen,
queriedSet: r.peersQueried,
}, err
}

Expand Down Expand Up @@ -274,6 +278,8 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// finally, run the query against this peer
res, err := r.query.qfunc(ctx, p)

r.peersQueried.Add(p)

if err != nil {
log.Debugf("ERROR worker for: %v %v", p, err)
r.Lock()
Expand Down

0 comments on commit 6b33279

Please sign in to comment.