Skip to content

Commit

Permalink
Merge pull request ipfs#1 from geoah/master
Browse files Browse the repository at this point in the history
Move DHT off `ipfs/go-ipfs`
  • Loading branch information
whyrusleeping authored Aug 22, 2016
2 parents be8cafa + f943d30 commit 9ac656b
Show file tree
Hide file tree
Showing 23 changed files with 4,546 additions and 3 deletions.
24 changes: 24 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
os:
- linux
- osx

language: go

go:
- 1.5.2

env:
- GO15VENDOREXPERIMENT=1

install: true

script:
- make deps
- go test ./...

cache:
directories:
- $GOPATH/src/gx

notifications:
email: false
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export IPFS_API ?= v04x.ipfs.io

gx:
go get -u github.com/whyrusleeping/gx
go get -u github.com/whyrusleeping/gx-go

deps: gx
gx --verbose install --global
gx-go rewrite
go get ./...
3 changes: 0 additions & 3 deletions README.md

This file was deleted.

340 changes: 340 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
// Package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
package dht

import (
"bytes"
"errors"
"fmt"
"sync"
"time"

key "github.com/ipfs/go-ipfs/blocks/key"
routing "github.com/ipfs/go-ipfs/routing"
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
providers "github.com/ipfs/go-ipfs/routing/dht/providers"
kb "github.com/ipfs/go-ipfs/routing/kbucket"
record "github.com/ipfs/go-ipfs/routing/record"

proto "github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
ci "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
host "github.com/ipfs/go-libp2p/p2p/host"
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
context "golang.org/x/net/context"
)

var log = logging.Logger("dht")

var ProtocolDHT protocol.ID = "/ipfs/dht"

// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

// TODO. SEE https:/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js

// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore pstore.Peerstore // Peer Registry

datastore ds.Datastore // Local data

routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *providers.ProviderManager

birth time.Time // When this peer started up
diaglock sync.Mutex // lock to make diagnostics work better

Validator record.Validator // record validator funcs
Selector record.Selector // record selection funcs

ctx context.Context
proc goprocess.Process

strmap map[peer.ID]*messageSender
smlk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := new(IpfsDHT)
dht.datastore = dstore
dht.self = h.ID()
dht.peerstore = h.Peerstore()
dht.host = h

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc = goprocess.WithTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})

dht.strmap = make(map[peer.ID]*messageSender)
dht.ctx = ctx

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = providers.NewProviderManager(dht.ctx, dht.self, dstore)
dht.proc.AddChild(dht.providers.Process())
goprocessctx.CloseAfterContext(dht.proc, ctx)

dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now()

dht.Validator = make(record.Validator)
dht.Validator["pk"] = record.PublicKeyValidator

dht.Selector = make(record.Selector)
dht.Selector["pk"] = record.PublicKeySelector

return dht
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key key.Key, rec *pb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
switch err {
case ErrReadTimeout:
log.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
return err
case nil:
break
}

if err != nil {
return err
}

if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
return errors.New("value not put correctly")
}
return nil
}

var errInvalidRecord = errors.New("received invalid record")

// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
key key.Key) (*pb.Record, []pstore.PeerInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}

// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
log.Debug("getValueOrPeers: got value")

// make sure record is valid.
err = dht.verifyRecordOnline(ctx, record)
if err != nil {
log.Info("Received invalid record! (discarded)")
// return a sentinal to signify an invalid record was received
err = errInvalidRecord
record = new(pb.Record)
}
return record, peers, err
}

if len(peers) > 0 {
log.Debug("getValueOrPeers: peers")
return nil, peers, nil
}

log.Warning("getValueOrPeers: routing.ErrNotFound")
return nil, nil, routing.ErrNotFound
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
key key.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()

pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
log.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
return nil, err
}
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key key.Key) (*pb.Record, error) {

log.Debug("getLocal %s", key)
v, err := dht.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
log.Debug("found in db")

byt, ok := v.([]byte)
if !ok {
return nil, errors.New("value stored in datastore not []byte")
}
rec := new(pb.Record)
err = proto.Unmarshal(byt, rec)
if err != nil {
return nil, err
}

err = dht.verifyRecordLocally(rec)
if err != nil {
log.Debugf("local record verify failed: %s (discarded)", err)
return nil, err
}

return rec, nil
}

// getOwnPrivateKey attempts to load the local peers private
// key from the peerstore.
func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
sk := dht.peerstore.PrivKey(dht.self)
if sk == nil {
log.Warningf("%s dht cannot get own private key!", dht.self)
return nil, fmt.Errorf("cannot get private key to sign record!")
}
return sk, nil
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key key.Key, rec *pb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
return err
}

return dht.datastore.Put(key.DsKey(), data)
}

// Update signals the routingTable to Update its last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
log.Event(ctx, "updatePeer", p)
dht.routingTable.Update(p)
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
p := dht.routingTable.Find(id)
if p != "" {
return dht.peerstore.PeerInfo(p)
}
return pstore.PeerInfo{}
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()

pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
log.Warningf("read timeout: %s %s", p.Pretty(), id)
fallthrough
default:
return nil, err
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()

pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
log.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
return nil, err
}
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := key.Key(pmes.GetKey())
closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
return closer
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
if closer == nil {
return nil
}

// == to self? thats bad
for _, p := range closer {
if p == dht.self {
log.Debug("attempted to return self! this shouldn't happen...")
return nil
}
}

var filtered []peer.ID
for _, clp := range closer {
// Dont send a peer back themselves
if p == clp {
continue
}

filtered = append(filtered, clp)
}

// ok seems like closer nodes
return filtered
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}

// Close calls Process Close
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}
Loading

0 comments on commit 9ac656b

Please sign in to comment.