diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000000..3173bb88bed --- /dev/null +++ b/.travis.yml @@ -0,0 +1,24 @@ +os: + - linux + - osx + +language: go + +go: + - 1.5.2 + +env: + - GO15VENDOREXPERIMENT=1 + +install: true + +script: + - make deps + - go test ./... + +cache: + directories: + - $GOPATH/src/gx + +notifications: + email: false \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000000..45d0c77e931 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +export IPFS_API ?= v04x.ipfs.io + +gx: + go get -u github.com/whyrusleeping/gx + go get -u github.com/whyrusleeping/gx-go + +deps: gx + gx --verbose install --global + gx-go rewrite + go get ./... \ No newline at end of file diff --git a/README.md b/README.md deleted file mode 100644 index a8b791abbc2..00000000000 --- a/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# go-libp2p-kad-dht - -A kademlia dht implemention on go-libp2p diff --git a/dht.go b/dht.go new file mode 100644 index 00000000000..55ef06ade1b --- /dev/null +++ b/dht.go @@ -0,0 +1,340 @@ +// Package dht implements a distributed hash table that satisfies the ipfs routing +// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications. +package dht + +import ( + "bytes" + "errors" + "fmt" + "sync" + "time" + + key "github.com/ipfs/go-ipfs/blocks/key" + routing "github.com/ipfs/go-ipfs/routing" + pb "github.com/ipfs/go-ipfs/routing/dht/pb" + providers "github.com/ipfs/go-ipfs/routing/dht/providers" + kb "github.com/ipfs/go-ipfs/routing/kbucket" + record "github.com/ipfs/go-ipfs/routing/record" + + proto "github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-datastore" + ci "github.com/ipfs/go-libp2p-crypto" + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + host "github.com/ipfs/go-libp2p/p2p/host" + protocol "github.com/ipfs/go-libp2p/p2p/protocol" + logging "github.com/ipfs/go-log" + goprocess "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" + context "golang.org/x/net/context" +) + +var log = logging.Logger("dht") + +var ProtocolDHT protocol.ID = "/ipfs/dht" + +// NumBootstrapQueries defines the number of random dht queries to do to +// collect members of the routing table. +const NumBootstrapQueries = 5 + +// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js + +// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. +// It is used to implement the base IpfsRouting module. +type IpfsDHT struct { + host host.Host // the network services we need + self peer.ID // Local peer (yourself) + peerstore pstore.Peerstore // Peer Registry + + datastore ds.Datastore // Local data + + routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes + providers *providers.ProviderManager + + birth time.Time // When this peer started up + diaglock sync.Mutex // lock to make diagnostics work better + + Validator record.Validator // record validator funcs + Selector record.Selector // record selection funcs + + ctx context.Context + proc goprocess.Process + + strmap map[peer.ID]*messageSender + smlk sync.Mutex +} + +// NewDHT creates a new DHT object with the given peer as the 'local' host +func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { + dht := new(IpfsDHT) + dht.datastore = dstore + dht.self = h.ID() + dht.peerstore = h.Peerstore() + dht.host = h + + // register for network notifs. + dht.host.Network().Notify((*netNotifiee)(dht)) + + dht.proc = goprocess.WithTeardown(func() error { + // remove ourselves from network notifs. + dht.host.Network().StopNotify((*netNotifiee)(dht)) + return nil + }) + + dht.strmap = make(map[peer.ID]*messageSender) + dht.ctx = ctx + + h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) + dht.providers = providers.NewProviderManager(dht.ctx, dht.self, dstore) + dht.proc.AddChild(dht.providers.Process()) + goprocessctx.CloseAfterContext(dht.proc, ctx) + + dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore) + dht.birth = time.Now() + + dht.Validator = make(record.Validator) + dht.Validator["pk"] = record.PublicKeyValidator + + dht.Selector = make(record.Selector) + dht.Selector["pk"] = record.PublicKeySelector + + return dht +} + +// putValueToPeer stores the given key/value pair at the peer 'p' +func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, + key key.Key, rec *pb.Record) error { + + pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) + pmes.Record = rec + rpmes, err := dht.sendRequest(ctx, p, pmes) + switch err { + case ErrReadTimeout: + log.Warningf("read timeout: %s %s", p.Pretty(), key) + fallthrough + default: + return err + case nil: + break + } + + if err != nil { + return err + } + + if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) { + return errors.New("value not put correctly") + } + return nil +} + +var errInvalidRecord = errors.New("received invalid record") + +// getValueOrPeers queries a particular peer p for the value for +// key. It returns either the value or a list of closer peers. +// NOTE: It will update the dht's peerstore with any new addresses +// it finds for the given peer. +func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, + key key.Key) (*pb.Record, []pstore.PeerInfo, error) { + + pmes, err := dht.getValueSingle(ctx, p, key) + if err != nil { + return nil, nil, err + } + + // Perhaps we were given closer peers + peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) + + if record := pmes.GetRecord(); record != nil { + // Success! We were given the value + log.Debug("getValueOrPeers: got value") + + // make sure record is valid. + err = dht.verifyRecordOnline(ctx, record) + if err != nil { + log.Info("Received invalid record! (discarded)") + // return a sentinal to signify an invalid record was received + err = errInvalidRecord + record = new(pb.Record) + } + return record, peers, err + } + + if len(peers) > 0 { + log.Debug("getValueOrPeers: peers") + return nil, peers, nil + } + + log.Warning("getValueOrPeers: routing.ErrNotFound") + return nil, nil, routing.ErrNotFound +} + +// getValueSingle simply performs the get value RPC with the given parameters +func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, + key key.Key) (*pb.Message, error) { + defer log.EventBegin(ctx, "getValueSingle", p, &key).Done() + + pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Warningf("read timeout: %s %s", p.Pretty(), key) + fallthrough + default: + return nil, err + } +} + +// getLocal attempts to retrieve the value from the datastore +func (dht *IpfsDHT) getLocal(key key.Key) (*pb.Record, error) { + + log.Debug("getLocal %s", key) + v, err := dht.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + log.Debug("found in db") + + byt, ok := v.([]byte) + if !ok { + return nil, errors.New("value stored in datastore not []byte") + } + rec := new(pb.Record) + err = proto.Unmarshal(byt, rec) + if err != nil { + return nil, err + } + + err = dht.verifyRecordLocally(rec) + if err != nil { + log.Debugf("local record verify failed: %s (discarded)", err) + return nil, err + } + + return rec, nil +} + +// getOwnPrivateKey attempts to load the local peers private +// key from the peerstore. +func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) { + sk := dht.peerstore.PrivKey(dht.self) + if sk == nil { + log.Warningf("%s dht cannot get own private key!", dht.self) + return nil, fmt.Errorf("cannot get private key to sign record!") + } + return sk, nil +} + +// putLocal stores the key value pair in the datastore +func (dht *IpfsDHT) putLocal(key key.Key, rec *pb.Record) error { + data, err := proto.Marshal(rec) + if err != nil { + return err + } + + return dht.datastore.Put(key.DsKey(), data) +} + +// Update signals the routingTable to Update its last-seen status +// on the given peer. +func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { + log.Event(ctx, "updatePeer", p) + dht.routingTable.Update(p) +} + +// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. +func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo { + p := dht.routingTable.Find(id) + if p != "" { + return dht.peerstore.PeerInfo(p) + } + return pstore.PeerInfo{} +} + +// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is +func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) { + defer log.EventBegin(ctx, "findPeerSingle", p, id).Done() + + pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Warningf("read timeout: %s %s", p.Pretty(), id) + fallthrough + default: + return nil, err + } +} + +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) { + defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done() + + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Warningf("read timeout: %s %s", p.Pretty(), key) + fallthrough + default: + return nil, err + } +} + +// nearestPeersToQuery returns the routing tables closest peers. +func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { + key := key.Key(pmes.GetKey()) + closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count) + return closer +} + +// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. +func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID { + closer := dht.nearestPeersToQuery(pmes, count) + + // no node? nil + if closer == nil { + return nil + } + + // == to self? thats bad + for _, p := range closer { + if p == dht.self { + log.Debug("attempted to return self! this shouldn't happen...") + return nil + } + } + + var filtered []peer.ID + for _, clp := range closer { + // Dont send a peer back themselves + if p == clp { + continue + } + + filtered = append(filtered, clp) + } + + // ok seems like closer nodes + return filtered +} + +// Context return dht's context +func (dht *IpfsDHT) Context() context.Context { + return dht.ctx +} + +// Process return dht's process +func (dht *IpfsDHT) Process() goprocess.Process { + return dht.proc +} + +// Close calls Process Close +func (dht *IpfsDHT) Close() error { + return dht.proc.Close() +} diff --git a/dht_bootstrap.go b/dht_bootstrap.go new file mode 100644 index 00000000000..af53097b697 --- /dev/null +++ b/dht_bootstrap.go @@ -0,0 +1,182 @@ +// Package dht implements a distributed hash table that satisfies the ipfs routing +// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications. +package dht + +import ( + "crypto/rand" + "fmt" + "sync" + "time" + + u "github.com/ipfs/go-ipfs-util" + routing "github.com/ipfs/go-ipfs/routing" + peer "github.com/ipfs/go-libp2p-peer" + + goprocess "github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/goprocess/periodic" + context "golang.org/x/net/context" +) + +// BootstrapConfig specifies parameters used bootstrapping the DHT. +// +// Note there is a tradeoff between the bootstrap period and the +// number of queries. We could support a higher period with less +// queries. +type BootstrapConfig struct { + Queries int // how many queries to run per period + Period time.Duration // how often to run periodi cbootstrap. + Timeout time.Duration // how long to wait for a bootstrao query to run +} + +var DefaultBootstrapConfig = BootstrapConfig{ + // For now, this is set to 1 query. + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // of our implementation's robustness, we should lower this down to 8 or 4. + Queries: 1, + + // For now, this is set to 1 minute, which is a medium period. We are + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. + Period: time.Duration(5 * time.Minute), + + Timeout: time.Duration(10 * time.Second), +} + +// Bootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface +func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { + proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig) + if err != nil { + return err + } + + // wait till ctx or dht.Context exits. + // we have to do it this way to satisfy the Routing interface (contexts) + go func() { + defer proc.Close() + select { + case <-ctx.Done(): + case <-dht.Context().Done(): + } + }() + + return nil +} + +// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// BootstrapWithConfig returns a process, so the user can stop it. +func (dht *IpfsDHT) BootstrapWithConfig(config BootstrapConfig) (goprocess.Process, error) { + sig := time.Tick(config.Period) + return dht.BootstrapOnSignal(config, sig) +} + +// SignalBootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// SignalBootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { + if cfg.Queries <= 0 { + return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) + } + + if signal == nil { + return nil, fmt.Errorf("invalid signal: %v", signal) + } + + proc := periodicproc.Ticker(signal, func(worker goprocess.Process) { + // it would be useful to be able to send out signals of when we bootstrap, too... + // maybe this is a good case for whole module event pub/sub? + + ctx := dht.Context() + if err := dht.runBootstrap(ctx, cfg); err != nil { + log.Warning(err) + // A bootstrapping error is important to notice but not fatal. + } + }) + + return proc, nil +} + +// runBootstrap builds up list of peers by requesting random peer IDs +func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { + bslog := func(msg string) { + log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) + } + bslog("start") + defer bslog("end") + defer log.EventBegin(ctx, "dhtRunBootstrap").Done() + + var merr u.MultiErr + + randomID := func() peer.ID { + // 16 random bytes is not a valid peer id. it may be fine becuase + // the dht will rehash to its own keyspace anyway. + id := make([]byte, 16) + rand.Read(id) + id = u.Hash(id) + return peer.ID(id) + } + + // bootstrap sequentially, as results will compound + ctx, cancel := context.WithTimeout(ctx, cfg.Timeout) + defer cancel() + runQuery := func(ctx context.Context, id peer.ID) { + p, err := dht.FindPeer(ctx, id) + if err == routing.ErrNotFound { + // this isn't an error. this is precisely what we expect. + } else if err != nil { + merr = append(merr, err) + } else { + // woah, actually found a peer with that ID? this shouldn't happen normally + // (as the ID we use is not a real ID). this is an odd error worth logging. + err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p) + log.Warningf("%s", err) + merr = append(merr, err) + } + } + + sequential := true + if sequential { + // these should be parallel normally. but can make them sequential for debugging. + // note that the core/bootstrap context deadline should be extended too for that. + for i := 0; i < cfg.Queries; i++ { + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) + runQuery(ctx, id) + } + + } else { + // note on parallelism here: the context is passed in to the queries, so they + // **should** exit when it exceeds, making this function exit on ctx cancel. + // normally, we should be selecting on ctx.Done() here too, but this gets + // complicated to do with WaitGroup, and doesnt wait for the children to exit. + var wg sync.WaitGroup + for i := 0; i < cfg.Queries; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) + runQuery(ctx, id) + }() + } + wg.Wait() + } + + if len(merr) > 0 { + return merr + } + return nil +} diff --git a/dht_net.go b/dht_net.go new file mode 100644 index 00000000000..211d1167798 --- /dev/null +++ b/dht_net.go @@ -0,0 +1,250 @@ +package dht + +import ( + "fmt" + "sync" + "time" + + ggio "github.com/gogo/protobuf/io" + pb "github.com/ipfs/go-ipfs/routing/dht/pb" + peer "github.com/ipfs/go-libp2p-peer" + inet "github.com/ipfs/go-libp2p/p2p/net" + ctxio "github.com/jbenet/go-context/io" + context "golang.org/x/net/context" +) + +var dhtReadMessageTimeout = time.Minute +var ErrReadTimeout = fmt.Errorf("timed out reading response") + +// handleNewStream implements the inet.StreamHandler +func (dht *IpfsDHT) handleNewStream(s inet.Stream) { + go dht.handleNewMessage(s) +} + +func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { + defer s.Close() + + ctx := dht.Context() + cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func + cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func + r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(cw) + mPeer := s.Conn().RemotePeer() + + for { + // receive msg + pmes := new(pb.Message) + if err := r.ReadMsg(pmes); err != nil { + log.Debugf("Error unmarshaling data: %s", err) + return + } + + // update the peer (on valid msgs only) + dht.updateFromMessage(ctx, mPeer, pmes) + + // get handler for this msg type. + handler := dht.handlerForMsgType(pmes.GetType()) + if handler == nil { + log.Debug("got back nil handler from handlerForMsgType") + return + } + + // dispatch handler. + rpmes, err := handler(ctx, mPeer, pmes) + if err != nil { + log.Debugf("handle message error: %s", err) + return + } + + // if nil response, return it before serializing + if rpmes == nil { + log.Debug("got back nil response from request") + continue + } + + // send out response msg + if err := w.WriteMsg(rpmes); err != nil { + log.Debugf("send response error: %s", err) + return + } + } +} + +// sendRequest sends out a request, but also makes sure to +// measure the RTT for latency measurements. +func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + + ms := dht.messageSenderForPeer(p) + + start := time.Now() + + rpmes, err := ms.SendRequest(ctx, pmes) + if err != nil { + return nil, err + } + + // update the peer (on valid msgs only) + dht.updateFromMessage(ctx, p, rpmes) + + dht.peerstore.RecordLatency(p, time.Since(start)) + log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes) + return rpmes, nil +} + +// sendMessage sends out a message +func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { + + ms := dht.messageSenderForPeer(p) + + if err := ms.SendMessage(ctx, pmes); err != nil { + return err + } + log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) + return nil +} + +func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error { + dht.Update(ctx, p) + return nil +} + +func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) *messageSender { + dht.smlk.Lock() + defer dht.smlk.Unlock() + + ms, ok := dht.strmap[p] + if !ok { + ms = dht.newMessageSender(p) + dht.strmap[p] = ms + } + + return ms +} + +type messageSender struct { + s inet.Stream + r ggio.ReadCloser + w ggio.WriteCloser + lk sync.Mutex + p peer.ID + dht *IpfsDHT + + singleMes int +} + +func (dht *IpfsDHT) newMessageSender(p peer.ID) *messageSender { + return &messageSender{p: p, dht: dht} +} + +func (ms *messageSender) prep() error { + if ms.s != nil { + return nil + } + + nstr, err := ms.dht.host.NewStream(ms.dht.ctx, ProtocolDHT, ms.p) + if err != nil { + return err + } + + ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) + ms.w = ggio.NewDelimitedWriter(nstr) + ms.s = nstr + + return nil +} + +// streamReuseTries is the number of times we will try to reuse a stream to a +// given peer before giving up and reverting to the old one-message-per-stream +// behaviour. +const streamReuseTries = 3 + +func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error { + ms.lk.Lock() + defer ms.lk.Unlock() + if err := ms.prep(); err != nil { + return err + } + + if err := ms.writeMessage(pmes); err != nil { + return err + } + + if ms.singleMes > streamReuseTries { + ms.s.Close() + ms.s = nil + } + + return nil +} + +func (ms *messageSender) writeMessage(pmes *pb.Message) error { + err := ms.w.WriteMsg(pmes) + if err != nil { + // If the other side isnt expecting us to be reusing streams, we're gonna + // end up erroring here. To make sure things work seamlessly, lets retry once + // before continuing + + log.Infof("error writing message: ", err) + ms.s.Close() + ms.s = nil + if err := ms.prep(); err != nil { + return err + } + + if err := ms.w.WriteMsg(pmes); err != nil { + return err + } + + // keep track of this happening. If it happens a few times, its + // likely we can assume the otherside will never support stream reuse + ms.singleMes++ + } + return nil +} + +func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { + ms.lk.Lock() + defer ms.lk.Unlock() + if err := ms.prep(); err != nil { + return nil, err + } + + if err := ms.writeMessage(pmes); err != nil { + return nil, err + } + + log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) + + mes := new(pb.Message) + if err := ms.ctxReadMsg(ctx, mes); err != nil { + ms.s.Close() + ms.s = nil + return nil, err + } + + if ms.singleMes > streamReuseTries { + ms.s.Close() + ms.s = nil + } + + return mes, nil +} + +func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { + errc := make(chan error, 1) + go func(r ggio.ReadCloser) { + errc <- r.ReadMsg(mes) + }(ms.r) + + t := time.NewTimer(dhtReadMessageTimeout) + defer t.Stop() + + select { + case err := <-errc: + return err + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + return ErrReadTimeout + } +} diff --git a/dht_test.go b/dht_test.go new file mode 100644 index 00000000000..68856ac4e08 --- /dev/null +++ b/dht_test.go @@ -0,0 +1,828 @@ +package dht + +import ( + "bytes" + "fmt" + "math/rand" + "sort" + "sync" + "testing" + "time" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + key "github.com/ipfs/go-ipfs/blocks/key" + routing "github.com/ipfs/go-ipfs/routing" + record "github.com/ipfs/go-ipfs/routing/record" + ci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci" + travisci "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis" + + u "github.com/ipfs/go-ipfs-util" + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + netutil "github.com/ipfs/go-libp2p/p2p/test/util" + ma "github.com/jbenet/go-multiaddr" + context "golang.org/x/net/context" +) + +var testCaseValues = map[key.Key][]byte{} + +func init() { + testCaseValues["hello"] = []byte("world") + for i := 0; i < 100; i++ { + k := fmt.Sprintf("%d -- key", i) + v := fmt.Sprintf("%d -- value", i) + testCaseValues[key.Key(k)] = []byte(v) + } +} + +func setupDHT(ctx context.Context, t *testing.T) *IpfsDHT { + h := netutil.GenHostSwarm(t, ctx) + + dss := dssync.MutexWrap(ds.NewMapDatastore()) + d := NewDHT(ctx, h, dss) + + d.Validator["v"] = &record.ValidChecker{ + Func: func(key.Key, []byte) error { + return nil + }, + Sign: false, + } + return d +} + +func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) { + addrs := make([]ma.Multiaddr, n) + dhts := make([]*IpfsDHT, n) + peers := make([]peer.ID, n) + + sanityAddrsMap := make(map[string]struct{}) + sanityPeersMap := make(map[string]struct{}) + + for i := 0; i < n; i++ { + dhts[i] = setupDHT(ctx, t) + peers[i] = dhts[i].self + addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0] + + if _, lol := sanityAddrsMap[addrs[i].String()]; lol { + t.Fatal("While setting up DHTs address got duplicated.") + } else { + sanityAddrsMap[addrs[i].String()] = struct{}{} + } + if _, lol := sanityPeersMap[peers[i].String()]; lol { + t.Fatal("While setting up DHTs peerid got duplicated.") + } else { + sanityPeersMap[peers[i].String()] = struct{}{} + } + } + + return addrs, peers, dhts +} + +func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { + + idB := b.self + addrB := b.peerstore.Addrs(idB) + if len(addrB) == 0 { + t.Fatal("peers setup incorrectly: no local address") + } + + a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL) + pi := pstore.PeerInfo{ID: idB} + if err := a.host.Connect(ctx, pi); err != nil { + t.Fatal(err) + } + + // loop until connection notification has been received. + // under high load, this may not happen as immediately as we would like. + for a.routingTable.Find(b.self) == "" { + time.Sleep(time.Millisecond * 5) + } + + for b.routingTable.Find(a.self) == "" { + time.Sleep(time.Millisecond * 5) + } +} + +func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { + + ctx, cancel := context.WithCancel(ctx) + log.Debugf("Bootstrapping DHTs...") + + // tried async. sequential fares much better. compare: + // 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2 + // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd + // probably because results compound + + var cfg BootstrapConfig + cfg = DefaultBootstrapConfig + cfg.Queries = 3 + + start := rand.Intn(len(dhts)) // randomize to decrease bias. + for i := range dhts { + dht := dhts[(start+i)%len(dhts)] + dht.runBootstrap(ctx, cfg) + } + cancel() +} + +func TestValueGetSet(t *testing.T) { + // t.Skip("skipping test to debug another") + + ctx := context.Background() + + dhtA := setupDHT(ctx, t) + dhtB := setupDHT(ctx, t) + + defer dhtA.Close() + defer dhtB.Close() + defer dhtA.host.Close() + defer dhtB.host.Close() + + vf := &record.ValidChecker{ + Func: func(key.Key, []byte) error { + return nil + }, + Sign: false, + } + nulsel := func(_ key.Key, bs [][]byte) (int, error) { + return 0, nil + } + + dhtA.Validator["v"] = vf + dhtB.Validator["v"] = vf + dhtA.Selector["v"] = nulsel + dhtB.Selector["v"] = nulsel + + connect(t, ctx, dhtA, dhtB) + + ctxT, _ := context.WithTimeout(ctx, time.Second) + dhtA.PutValue(ctxT, "/v/hello", []byte("world")) + + ctxT, _ = context.WithTimeout(ctx, time.Second*2) + val, err := dhtA.GetValue(ctxT, "/v/hello") + if err != nil { + t.Fatal(err) + } + + if string(val) != "world" { + t.Fatalf("Expected 'world' got '%s'", string(val)) + } + + ctxT, _ = context.WithTimeout(ctx, time.Second*2) + val, err = dhtB.GetValue(ctxT, "/v/hello") + if err != nil { + t.Fatal(err) + } + + if string(val) != "world" { + t.Fatalf("Expected 'world' got '%s'", string(val)) + } +} + +func TestProvides(t *testing.T) { + // t.Skip("skipping test to debug another") + ctx := context.Background() + + _, _, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[1], dhts[3]) + + for k, v := range testCaseValues { + log.Debugf("adding local values for %s = %s", k, v) + sk := dhts[3].peerstore.PrivKey(dhts[3].self) + rec, err := record.MakePutRecord(sk, k, v, false) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].putLocal(k, rec) + if err != nil { + t.Fatal(err) + } + + bits, err := dhts[3].getLocal(k) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(bits.GetValue(), v) { + t.Fatalf("didn't store the right bits (%s, %s)", k, v) + } + } + + for k := range testCaseValues { + log.Debugf("announcing provider for %s", k) + if err := dhts[3].Provide(ctx, k); err != nil { + t.Fatal(err) + } + } + + // what is this timeout for? was 60ms before. + time.Sleep(time.Millisecond * 6) + + n := 0 + for k := range testCaseValues { + n = (n + 1) % 3 + + log.Debugf("getting providers for %s from %d", k, n) + ctxT, _ := context.WithTimeout(ctx, time.Second) + provchan := dhts[n].FindProvidersAsync(ctxT, k, 1) + + select { + case prov := <-provchan: + if prov.ID == "" { + t.Fatal("Got back nil provider") + } + if prov.ID != dhts[3].self { + t.Fatal("Got back wrong provider") + } + case <-ctxT.Done(): + t.Fatal("Did not get a provider back.") + } + } +} + +// if minPeers or avgPeers is 0, dont test for it. +func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool { + // test "well-formed-ness" (>= minPeers peers in every routing table) + + checkTables := func() bool { + totalPeers := 0 + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + totalPeers += rtlen + if minPeers > 0 && rtlen < minPeers { + t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) + return false + } + } + actualAvgPeers := totalPeers / len(dhts) + t.Logf("avg rt size: %d", actualAvgPeers) + if avgPeers > 0 && actualAvgPeers < avgPeers { + t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers) + return false + } + return true + } + + timeoutA := time.After(timeout) + for { + select { + case <-timeoutA: + log.Debugf("did not reach well-formed routing tables by %s", timeout) + return false // failed + case <-time.After(5 * time.Millisecond): + if checkTables() { + return true // succeeded + } + } + } +} + +func printRoutingTables(dhts []*IpfsDHT) { + // the routing tables should be full now. let's inspect them. + fmt.Printf("checking routing table of %d\n", len(dhts)) + for _, dht := range dhts { + fmt.Printf("checking routing table of %s\n", dht.self) + dht.routingTable.Print() + fmt.Println("") + } +} + +func TestBootstrap(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + nDHTs := 30 + _, _, dhts := setupDHTS(ctx, nDHTs, t) + defer func() { + for i := 0; i < nDHTs; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + t.Logf("connecting %d dhts in a ring", nDHTs) + for i := 0; i < nDHTs; i++ { + connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)]) + } + + <-time.After(100 * time.Millisecond) + // bootstrap a few times until we get good tables. + stop := make(chan struct{}) + go func() { + for { + t.Logf("bootstrapping them so they find each other", nDHTs) + ctxT, _ := context.WithTimeout(ctx, 5*time.Second) + bootstrap(t, ctxT, dhts) + + select { + case <-time.After(50 * time.Millisecond): + continue // being explicit + case <-stop: + return + } + } + }() + + waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second) + close(stop) + + if u.Debug { + // the routing tables should be full now. let's inspect them. + printRoutingTables(dhts) + } +} + +func TestPeriodicBootstrap(t *testing.T) { + // t.Skip("skipping test to debug another") + if ci.IsRunning() { + t.Skip("skipping on CI. highly timing dependent") + } + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + nDHTs := 30 + _, _, dhts := setupDHTS(ctx, nDHTs, t) + defer func() { + for i := 0; i < nDHTs; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + // signal amplifier + amplify := func(signal chan time.Time, other []chan time.Time) { + for t := range signal { + for _, s := range other { + s <- t + } + } + for _, s := range other { + close(s) + } + } + + signal := make(chan time.Time) + allSignals := []chan time.Time{} + + var cfg BootstrapConfig + cfg = DefaultBootstrapConfig + cfg.Queries = 5 + + // kick off periodic bootstrappers with instrumented signals. + for _, dht := range dhts { + s := make(chan time.Time) + allSignals = append(allSignals, s) + dht.BootstrapOnSignal(cfg, s) + } + go amplify(signal, allSignals) + + t.Logf("dhts are not connected.", nDHTs) + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + if rtlen > 0 { + t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen) + } + } + + for i := 0; i < nDHTs; i++ { + connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)]) + } + + t.Logf("DHTs are now connected to 1-2 others.", nDHTs) + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + if rtlen > 2 { + t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen) + } + } + + if u.Debug { + printRoutingTables(dhts) + } + + t.Logf("bootstrapping them so they find each other", nDHTs) + signal <- time.Now() + + // this is async, and we dont know when it's finished with one cycle, so keep checking + // until the routing tables look better, or some long timeout for the failure case. + waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second) + + if u.Debug { + printRoutingTables(dhts) + } +} + +func TestProvidesMany(t *testing.T) { + t.Skip("this test doesn't work") + // t.Skip("skipping test to debug another") + ctx := context.Background() + + nDHTs := 40 + _, _, dhts := setupDHTS(ctx, nDHTs, t) + defer func() { + for i := 0; i < nDHTs; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + t.Logf("connecting %d dhts in a ring", nDHTs) + for i := 0; i < nDHTs; i++ { + connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)]) + } + + <-time.After(100 * time.Millisecond) + t.Logf("bootstrapping them so they find each other", nDHTs) + ctxT, _ := context.WithTimeout(ctx, 20*time.Second) + bootstrap(t, ctxT, dhts) + + if u.Debug { + // the routing tables should be full now. let's inspect them. + t.Logf("checking routing table of %d", nDHTs) + for _, dht := range dhts { + fmt.Printf("checking routing table of %s\n", dht.self) + dht.routingTable.Print() + fmt.Println("") + } + } + + var providers = map[key.Key]peer.ID{} + + d := 0 + for k, v := range testCaseValues { + d = (d + 1) % len(dhts) + dht := dhts[d] + providers[k] = dht.self + + t.Logf("adding local values for %s = %s (on %s)", k, v, dht.self) + rec, err := record.MakePutRecord(nil, k, v, false) + if err != nil { + t.Fatal(err) + } + + err = dht.putLocal(k, rec) + if err != nil { + t.Fatal(err) + } + + bits, err := dht.getLocal(k) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(bits.GetValue(), v) { + t.Fatalf("didn't store the right bits (%s, %s)", k, v) + } + + t.Logf("announcing provider for %s", k) + if err := dht.Provide(ctx, k); err != nil { + t.Fatal(err) + } + } + + // what is this timeout for? was 60ms before. + time.Sleep(time.Millisecond * 6) + + errchan := make(chan error) + + ctxT, _ = context.WithTimeout(ctx, 5*time.Second) + + var wg sync.WaitGroup + getProvider := func(dht *IpfsDHT, k key.Key) { + defer wg.Done() + + expected := providers[k] + + provchan := dht.FindProvidersAsync(ctxT, k, 1) + select { + case prov := <-provchan: + actual := prov.ID + if actual == "" { + errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self) + } else if actual != expected { + errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)", + expected, actual, k, dht.self) + } + case <-ctxT.Done(): + errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self) + } + } + + for k := range testCaseValues { + // everyone should be able to find it... + for _, dht := range dhts { + log.Debugf("getting providers for %s at %s", k, dht.self) + wg.Add(1) + go getProvider(dht, k) + } + } + + // we need this because of printing errors + go func() { + wg.Wait() + close(errchan) + }() + + for err := range errchan { + t.Error(err) + } +} + +func TestProvidesAsync(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + _, _, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[1], dhts[3]) + + k := key.Key("hello") + val := []byte("world") + sk := dhts[3].peerstore.PrivKey(dhts[3].self) + rec, err := record.MakePutRecord(sk, k, val, false) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].putLocal(k, rec) + if err != nil { + t.Fatal(err) + } + + bits, err := dhts[3].getLocal(k) + if err != nil && bytes.Equal(bits.GetValue(), val) { + t.Fatal(err) + } + + err = dhts[3].Provide(ctx, key.Key("hello")) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 60) + + ctxT, _ := context.WithTimeout(ctx, time.Millisecond*300) + provs := dhts[0].FindProvidersAsync(ctxT, key.Key("hello"), 5) + select { + case p, ok := <-provs: + if !ok { + t.Fatal("Provider channel was closed...") + } + if p.ID == "" { + t.Fatal("Got back nil provider!") + } + if p.ID != dhts[3].self { + t.Fatalf("got a provider, but not the right one. %s", p) + } + case <-ctxT.Done(): + t.Fatal("Didnt get back providers") + } +} + +func TestLayeredGet(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + _, _, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[1], dhts[3]) + + err := dhts[3].Provide(ctx, key.Key("/v/hello")) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 6) + + t.Log("interface was changed. GetValue should not use providers.") + ctxT, _ := context.WithTimeout(ctx, time.Second) + val, err := dhts[0].GetValue(ctxT, key.Key("/v/hello")) + if err != routing.ErrNotFound { + t.Error(err) + } + if string(val) == "world" { + t.Error("should not get value.") + } + if len(val) > 0 && string(val) != "world" { + t.Error("worse, there's a value and its not even the right one.") + } +} + +func TestFindPeer(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + _, peers, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + dhts[i].host.Close() + } + }() + + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[1], dhts[3]) + + ctxT, _ := context.WithTimeout(ctx, time.Second) + p, err := dhts[0].FindPeer(ctxT, peers[2]) + if err != nil { + t.Fatal(err) + } + + if p.ID == "" { + t.Fatal("Failed to find peer.") + } + + if p.ID != peers[2] { + t.Fatal("Didnt find expected peer.") + } +} + +func TestFindPeersConnectedToPeer(t *testing.T) { + t.Skip("not quite correct (see note)") + + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + _, peers, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + dhts[i].host.Close() + } + }() + + // topology: + // 0-1, 1-2, 1-3, 2-3 + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[1], dhts[3]) + connect(t, ctx, dhts[2], dhts[3]) + + // fmt.Println("0 is", peers[0]) + // fmt.Println("1 is", peers[1]) + // fmt.Println("2 is", peers[2]) + // fmt.Println("3 is", peers[3]) + + ctxT, _ := context.WithTimeout(ctx, time.Second) + pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2]) + if err != nil { + t.Fatal(err) + } + + // shouldFind := []peer.ID{peers[1], peers[3]} + var found []pstore.PeerInfo + for nextp := range pchan { + found = append(found, nextp) + } + + // fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2]) + // fmt.Println("should find 1, 3", shouldFind) + // fmt.Println("found", found) + + // testPeerListsMatch(t, shouldFind, found) + + log.Warning("TestFindPeersConnectedToPeer is not quite correct") + if len(found) == 0 { + t.Fatal("didn't find any peers.") + } +} + +func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) { + + if len(p1) != len(p2) { + t.Fatal("did not find as many peers as should have", p1, p2) + } + + ids1 := make([]string, len(p1)) + ids2 := make([]string, len(p2)) + + for i, p := range p1 { + ids1[i] = string(p) + } + + for i, p := range p2 { + ids2[i] = string(p) + } + + sort.Sort(sort.StringSlice(ids1)) + sort.Sort(sort.StringSlice(ids2)) + + for i := range ids1 { + if ids1[i] != ids2[i] { + t.Fatal("Didnt find expected peer", ids1[i], ids2) + } + } +} + +func TestConnectCollision(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + if travisci.IsRunning() { + t.Skip("Skipping on Travis-CI.") + } + + runTimes := 10 + + for rtime := 0; rtime < runTimes; rtime++ { + log.Info("Running Time: ", rtime) + + ctx := context.Background() + + dhtA := setupDHT(ctx, t) + dhtB := setupDHT(ctx, t) + + addrA := dhtA.peerstore.Addrs(dhtA.self)[0] + addrB := dhtB.peerstore.Addrs(dhtB.self)[0] + + peerA := dhtA.self + peerB := dhtB.self + + errs := make(chan error) + go func() { + dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL) + pi := pstore.PeerInfo{ID: peerB} + err := dhtA.host.Connect(ctx, pi) + errs <- err + }() + go func() { + dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL) + pi := pstore.PeerInfo{ID: peerA} + err := dhtB.host.Connect(ctx, pi) + errs <- err + }() + + timeout := time.After(5 * time.Second) + select { + case e := <-errs: + if e != nil { + t.Fatal(e) + } + case <-timeout: + t.Fatal("Timeout received!") + } + select { + case e := <-errs: + if e != nil { + t.Fatal(e) + } + case <-timeout: + t.Fatal("Timeout received!") + } + + dhtA.Close() + dhtB.Close() + dhtA.host.Close() + dhtB.host.Close() + } +} diff --git a/ext_test.go b/ext_test.go new file mode 100644 index 00000000000..fcf8548345e --- /dev/null +++ b/ext_test.go @@ -0,0 +1,290 @@ +package dht + +import ( + "io" + "math/rand" + "testing" + "time" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + key "github.com/ipfs/go-ipfs/blocks/key" + routing "github.com/ipfs/go-ipfs/routing" + pb "github.com/ipfs/go-ipfs/routing/dht/pb" + record "github.com/ipfs/go-ipfs/routing/record" + + ggio "github.com/gogo/protobuf/io" + u "github.com/ipfs/go-ipfs-util" + pstore "github.com/ipfs/go-libp2p-peerstore" + inet "github.com/ipfs/go-libp2p/p2p/net" + mocknet "github.com/ipfs/go-libp2p/p2p/net/mock" + context "golang.org/x/net/context" +) + +func TestGetFailures(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + mn, err := mocknet.FullMeshConnected(ctx, 2) + if err != nil { + t.Fatal(err) + } + hosts := mn.Hosts() + + tsds := dssync.MutexWrap(ds.NewMapDatastore()) + d := NewDHT(ctx, hosts[0], tsds) + d.Update(ctx, hosts[1].ID()) + + // Reply with failures to every message + hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) { + s.Close() + }) + + // This one should time out + ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond) + if _, err := d.GetValue(ctx1, key.Key("test")); err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } + + if err != io.EOF { + t.Fatal("Got different error than we expected", err) + } + } else { + t.Fatal("Did not get expected error!") + } + + t.Log("Timeout test passed.") + + // Reply with failures to every message + hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) { + defer s.Close() + + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) + + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + panic(err) + } + + resp := &pb.Message{ + Type: pmes.Type, + } + if err := pbw.WriteMsg(resp); err != nil { + panic(err) + } + }) + + // This one should fail with NotFound. + // long context timeout to ensure we dont end too early. + // the dht should be exhausting its query and returning not found. + // (was 3 seconds before which should be _plenty_ of time, but maybe + // travis machines really have a hard time...) + ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second) + _, err = d.GetValue(ctx2, key.Key("test")) + if err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } + if err != routing.ErrNotFound { + t.Fatalf("Expected ErrNotFound, got: %s", err) + } + } else { + t.Fatal("expected error, got none.") + } + + t.Log("ErrNotFound check passed!") + + // Now we test this DHT's handleGetValue failure + { + typ := pb.Message_GET_VALUE + str := "hello" + + sk, err := d.getOwnPrivateKey() + if err != nil { + t.Fatal(err) + } + + rec, err := record.MakePutRecord(sk, key.Key(str), []byte("blah"), true) + if err != nil { + t.Fatal(err) + } + req := pb.Message{ + Type: &typ, + Key: &str, + Record: rec, + } + + s, err := hosts[1].NewStream(context.Background(), ProtocolDHT, hosts[0].ID()) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) + + if err := pbw.WriteMsg(&req); err != nil { + t.Fatal(err) + } + + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + t.Fatal(err) + } + if pmes.GetRecord() != nil { + t.Fatal("shouldnt have value") + } + if pmes.GetProviderPeers() != nil { + t.Fatal("shouldnt have provider peers") + } + } +} + +func TestNotFound(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + mn, err := mocknet.FullMeshConnected(ctx, 16) + if err != nil { + t.Fatal(err) + } + hosts := mn.Hosts() + tsds := dssync.MutexWrap(ds.NewMapDatastore()) + d := NewDHT(ctx, hosts[0], tsds) + + for _, p := range hosts { + d.Update(ctx, p.ID()) + } + + // Reply with random peers to every message + for _, host := range hosts { + host := host // shadow loop var + host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) { + defer s.Close() + + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) + + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + panic(err) + } + + switch pmes.GetType() { + case pb.Message_GET_VALUE: + resp := &pb.Message{Type: pmes.Type} + + ps := []pstore.PeerInfo{} + for i := 0; i < 7; i++ { + p := hosts[rand.Intn(len(hosts))].ID() + pi := host.Peerstore().PeerInfo(p) + ps = append(ps, pi) + } + + resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps) + if err := pbw.WriteMsg(resp); err != nil { + panic(err) + } + + default: + panic("Shouldnt recieve this.") + } + }) + } + + // long timeout to ensure timing is not at play. + ctx, cancel := context.WithTimeout(ctx, time.Second*20) + defer cancel() + v, err := d.GetValue(ctx, key.Key("hello")) + log.Debugf("get value got %v", v) + if err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } + switch err { + case routing.ErrNotFound: + //Success! + return + case u.ErrTimeout: + t.Fatal("Should not have gotten timeout!") + default: + t.Fatalf("Got unexpected error: %s", err) + } + } + t.Fatal("Expected to recieve an error.") +} + +// If less than K nodes are in the entire network, it should fail when we make +// a GET rpc and nobody has the value +func TestLessThanKResponses(t *testing.T) { + // t.Skip("skipping test to debug another") + // t.Skip("skipping test because it makes a lot of output") + + ctx := context.Background() + mn, err := mocknet.FullMeshConnected(ctx, 6) + if err != nil { + t.Fatal(err) + } + hosts := mn.Hosts() + + tsds := dssync.MutexWrap(ds.NewMapDatastore()) + d := NewDHT(ctx, hosts[0], tsds) + + for i := 1; i < 5; i++ { + d.Update(ctx, hosts[i].ID()) + } + + // Reply with random peers to every message + for _, host := range hosts { + host := host // shadow loop var + host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) { + defer s.Close() + + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) + + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + panic(err) + } + + switch pmes.GetType() { + case pb.Message_GET_VALUE: + pi := host.Peerstore().PeerInfo(hosts[1].ID()) + resp := &pb.Message{ + Type: pmes.Type, + CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}), + } + + if err := pbw.WriteMsg(resp); err != nil { + panic(err) + } + default: + panic("Shouldnt recieve this.") + } + + }) + } + + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + if _, err := d.GetValue(ctx, key.Key("hello")); err != nil { + switch err { + case routing.ErrNotFound: + //Success! + return + case u.ErrTimeout: + t.Fatal("Should not have gotten timeout!") + default: + t.Fatalf("Got unexpected error: %s", err) + } + } + t.Fatal("Expected to recieve an error.") +} diff --git a/handlers.go b/handlers.go new file mode 100644 index 00000000000..7924d99f1d0 --- /dev/null +++ b/handlers.go @@ -0,0 +1,287 @@ +package dht + +import ( + "errors" + "fmt" + "time" + + ds "github.com/ipfs/go-datastore" + key "github.com/ipfs/go-ipfs/blocks/key" + pb "github.com/ipfs/go-ipfs/routing/dht/pb" + lgbl "github.com/ipfs/go-ipfs/thirdparty/loggables" + + proto "github.com/gogo/protobuf/proto" + u "github.com/ipfs/go-ipfs-util" + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + context "golang.org/x/net/context" +) + +// The number of closer peers to send on requests. +var CloserPeerCount = KValue + +// dhthandler specifies the signature of functions that handle DHT messages. +type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) + +func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { + switch t { + case pb.Message_GET_VALUE: + return dht.handleGetValue + case pb.Message_PUT_VALUE: + return dht.handlePutValue + case pb.Message_FIND_NODE: + return dht.handleFindPeer + case pb.Message_ADD_PROVIDER: + return dht.handleAddProvider + case pb.Message_GET_PROVIDERS: + return dht.handleGetProviders + case pb.Message_PING: + return dht.handlePing + default: + return nil + } +} + +func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handleGetValue", p).Done() + log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey()) + + // setup response + resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) + + // first, is there even a key? + k := key.Key(pmes.GetKey()) + if k == "" { + return nil, errors.New("handleGetValue but no key was provided") + // TODO: send back an error response? could be bad, but the other node's hanging. + } + + rec, err := dht.checkLocalDatastore(k) + if err != nil { + return nil, err + } + resp.Record = rec + + // Find closest peer on given cluster to desired key and reply with that info + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) + if len(closer) > 0 { + closerinfos := pstore.PeerInfos(dht.peerstore, closer) + for _, pi := range closerinfos { + log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID) + if len(pi.Addrs) < 1 { + log.Errorf(`no addresses on peer being sent! + [local:%s] + [sending:%s] + [remote:%s]`, dht.self, pi.ID, p) + } + } + + resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos) + } + + return resp, nil +} + +func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) { + log.Debugf("%s handleGetValue looking into ds", dht.self) + dskey := k.DsKey() + iVal, err := dht.datastore.Get(dskey) + log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal) + + if err == ds.ErrNotFound { + return nil, nil + } + + // if we got an unexpected error, bail. + if err != nil { + return nil, err + } + + // if we have the value, send it back + log.Debugf("%s handleGetValue success!", dht.self) + + byts, ok := iVal.([]byte) + if !ok { + return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey) + } + + rec := new(pb.Record) + err = proto.Unmarshal(byts, rec) + if err != nil { + log.Debug("failed to unmarshal DHT record from datastore") + return nil, err + } + + // if its our record, dont bother checking the times on it + if peer.ID(rec.GetAuthor()) == dht.self { + return rec, nil + } + + var recordIsBad bool + recvtime, err := u.ParseRFC3339(rec.GetTimeReceived()) + if err != nil { + log.Info("either no receive time set on record, or it was invalid: ", err) + recordIsBad = true + } + + if time.Now().Sub(recvtime) > MaxRecordAge { + log.Debug("old record found, tossing.") + recordIsBad = true + } + + // NOTE: We do not verify the record here beyond checking these timestamps. + // we put the burden of checking the records on the requester as checking a record + // may be computationally expensive + + if recordIsBad { + err := dht.datastore.Delete(dskey) + if err != nil { + log.Error("Failed to delete bad record from datastore: ", err) + } + + return nil, nil // can treat this as not having the record at all + } + + return rec, nil +} + +// Store a value in this peer local storage +func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handlePutValue", p).Done() + dskey := key.Key(pmes.GetKey()).DsKey() + + if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil { + log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err) + return nil, err + } + + rec := pmes.GetRecord() + + // record the time we receive every record + rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now())) + + data, err := proto.Marshal(rec) + if err != nil { + return nil, err + } + + err = dht.datastore.Put(dskey, data) + log.Debugf("%s handlePutValue %v", dht.self, dskey) + return pmes, err +} + +func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + log.Debugf("%s Responding to ping from %s!\n", dht.self, p) + return pmes, nil +} + +func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handleFindPeer", p).Done() + resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) + var closest []peer.ID + + // if looking for self... special case where we send it on CloserPeers. + if peer.ID(pmes.GetKey()) == dht.self { + closest = []peer.ID{dht.self} + } else { + closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount) + } + + if closest == nil { + log.Infof("%s handleFindPeer %s: could not find anything.", dht.self, p) + return resp, nil + } + + var withAddresses []pstore.PeerInfo + closestinfos := pstore.PeerInfos(dht.peerstore, closest) + for _, pi := range closestinfos { + if len(pi.Addrs) > 0 { + withAddresses = append(withAddresses, pi) + log.Debugf("handleFindPeer: sending back '%s'", pi.ID) + } + } + + resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses) + return resp, nil +} + +func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + lm := make(lgbl.DeferredMap) + lm["peer"] = func() interface{} { return p.Pretty() } + defer log.EventBegin(ctx, "handleGetProviders", lm).Done() + + resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) + key := key.Key(pmes.GetKey()) + lm["key"] = func() interface{} { return key.B58String() } + + // debug logging niceness. + reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key) + log.Debugf("%s begin", reqDesc) + defer log.Debugf("%s end", reqDesc) + + // check if we have this value, to add ourselves as provider. + has, err := dht.datastore.Has(key.DsKey()) + if err != nil && err != ds.ErrNotFound { + log.Debugf("unexpected datastore error: %v\n", err) + has = false + } + + // setup providers + providers := dht.providers.GetProviders(ctx, key) + if has { + providers = append(providers, dht.self) + log.Debugf("%s have the value. added self as provider", reqDesc) + } + + if providers != nil && len(providers) > 0 { + infos := pstore.PeerInfos(dht.peerstore, providers) + resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos) + } + + // Also send closer peers. + closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) + if closer != nil { + infos := pstore.PeerInfos(dht.peerstore, closer) + resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos) + } + + return resp, nil +} + +func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + lm := make(lgbl.DeferredMap) + lm["peer"] = func() interface{} { return p.Pretty() } + + defer log.EventBegin(ctx, "handleAddProvider", lm).Done() + key := key.Key(pmes.GetKey()) + lm["key"] = func() interface{} { return key.B58String() } + + log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) + + // add provider should use the address given in the message + pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) + for _, pi := range pinfos { + if pi.ID != p { + // we should ignore this provider reccord! not from originator. + // (we chould sign them and check signature later...) + log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p) + continue + } + + if len(pi.Addrs) < 1 { + log.Debugf("%s got no valid addresses for provider %s. Ignore.", dht.self, p) + continue + } + + log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs) + if pi.ID != dht.self { // dont add own addrs. + // add the received addresses to our peerstore. + dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL) + } + dht.providers.AddProvider(ctx, key, p) + } + + return nil, nil +} diff --git a/lookup.go b/lookup.go new file mode 100644 index 00000000000..bcc47bcd8fa --- /dev/null +++ b/lookup.go @@ -0,0 +1,111 @@ +package dht + +import ( + key "github.com/ipfs/go-ipfs/blocks/key" + notif "github.com/ipfs/go-ipfs/notifications" + kb "github.com/ipfs/go-ipfs/routing/kbucket" + pset "github.com/ipfs/go-ipfs/thirdparty/peerset" + + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + context "golang.org/x/net/context" +) + +// Required in order for proper JSON marshaling +func pointerizePeerInfos(pis []pstore.PeerInfo) []*pstore.PeerInfo { + out := make([]*pstore.PeerInfo, len(pis)) + for i, p := range pis { + np := p + out[i] = &np + } + return out +} + +// Kademlia 'node lookup' operation. Returns a channel of the K closest peers +// to the given key +func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan peer.ID, error) { + e := log.EventBegin(ctx, "getClosestPeers", &key) + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) + if len(tablepeers) == 0 { + return nil, kb.ErrLookupFailure + } + + out := make(chan peer.ID, KValue) + peerset := pset.NewLimited(KValue) + + for _, p := range tablepeers { + select { + case out <- p: + case <-ctx.Done(): + return nil, ctx.Err() + } + peerset.Add(p) + } + + // since the query doesnt actually pass our context down + // we have to hack this here. whyrusleeping isnt a huge fan of goprocess + parent := ctx + query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + // For DHT query command + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) + + closer, err := dht.closerPeersSingle(ctx, key, p) + if err != nil { + log.Debugf("error getting closer peers: %s", err) + return nil, err + } + + var filtered []pstore.PeerInfo + for _, clp := range closer { + if kb.Closer(clp, dht.self, key) && peerset.TryAdd(clp) { + select { + case out <- clp: + case <-ctx.Done(): + return nil, ctx.Err() + } + filtered = append(filtered, dht.peerstore.PeerInfo(clp)) + } + } + + // For DHT query command + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.PeerResponse, + ID: p, + Responses: pointerizePeerInfos(filtered), + }) + + return &dhtQueryResult{closerPeers: filtered}, nil + }) + + go func() { + defer close(out) + defer e.Done() + // run it! + _, err := query.Run(ctx, tablepeers) + if err != nil { + log.Debugf("closestPeers query run error: %s", err) + } + }() + + return out, nil +} + +func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key key.Key, p peer.ID) ([]peer.ID, error) { + pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) + if err != nil { + return nil, err + } + + var out []peer.ID + for _, pbp := range pmes.GetCloserPeers() { + pid := peer.ID(pbp.GetId()) + if pid != dht.self { // dont add self + dht.peerstore.AddAddrs(pid, pbp.Addresses(), pstore.TempAddrTTL) + out = append(out, pid) + } + } + return out, nil +} diff --git a/notif.go b/notif.go new file mode 100644 index 00000000000..440a0d2b1d2 --- /dev/null +++ b/notif.go @@ -0,0 +1,39 @@ +package dht + +import ( + ma "github.com/jbenet/go-multiaddr" + + inet "github.com/ipfs/go-libp2p/p2p/net" +) + +// netNotifiee defines methods to be used with the IpfsDHT +type netNotifiee IpfsDHT + +func (nn *netNotifiee) DHT() *IpfsDHT { + return (*IpfsDHT)(nn) +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + dht := nn.DHT() + select { + case <-dht.Process().Closing(): + return + default: + } + dht.Update(dht.Context(), v.RemotePeer()) +} + +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + dht := nn.DHT() + select { + case <-dht.Process().Closing(): + return + default: + } + dht.routingTable.Remove(v.RemotePeer()) +} + +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {} diff --git a/package.json b/package.json new file mode 100644 index 00000000000..f2df4f64395 --- /dev/null +++ b/package.json @@ -0,0 +1,101 @@ +{ + "bugs": { + "url": "https://github.com/libp2p/go-libp2p-kad-dht" + }, + "gx": { + "dvcsimport": "github.com/libp2p/go-libp2p-kad-dht", + "goversion": "1.5.2" + }, + "gxDependencies": [ + { + "hash": "QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR", + "name": "go-log", + "version": "1.1.2" + }, + { + "hash": "QmVCe3SNMjkcPgnpFhZs719dheq6xE7gJwjzV7aWcUM4Ms", + "name": "go-libp2p", + "version": "3.3.7" + }, + { + "author": "whyrusleeping", + "hash": "QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP", + "name": "go-libp2p-crypto", + "version": "1.0.3" + }, + { + "author": "whyrusleeping", + "hash": "QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs", + "name": "go-libp2p-peer", + "version": "1.0.8" + }, + { + "hash": "QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt", + "name": "go-net", + "version": "0.0.0" + }, + { + "hash": "QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn", + "name": "goprocess", + "version": "0.0.0" + }, + { + "hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV", + "name": "gogo-protobuf", + "version": "0.0.0" + }, + { + "author": "cheggaaa", + "hash": "QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs", + "name": "pb", + "version": "1.0.3" + }, + { + "author": "jbenet", + "hash": "QmX6DhWrpBB5NtadXmPSXYNdVvuLfJXoFNMvUMoVvP5UJa", + "name": "go-context", + "version": "0.0.0" + }, + { + "author": "jbenet", + "hash": "QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk", + "name": "go-datastore", + "version": "0.2.0" + }, + { + "author": "hashicorp", + "hash": "QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy", + "name": "golang-lru", + "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P", + "name": "go-libp2p-peerstore", + "version": "1.1.2" + }, + { + "author": "whyrusleeping", + "hash": "Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj", + "name": "base32", + "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmVvJ27GcLaLSXvcB4auk3Gn3xuWK5ti5ENkZ2pCoJEYW4", + "name": "autobatch", + "version": "0.2.0" + }, + { + "author": "kubuxu", + "hash": "QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5", + "name": "bbloom", + "version": "0.0.2" + } + ], + "gxVersion": "0.4.0", + "language": "go", + "license": "MIT", + "name": "go-libp2p-kad-dht", + "version": "0.0.0" +} \ No newline at end of file diff --git a/pb/Makefile b/pb/Makefile new file mode 100644 index 00000000000..08ac883d0d0 --- /dev/null +++ b/pb/Makefile @@ -0,0 +1,11 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $< + +clean: + rm -f *.pb.go + rm -f *.go diff --git a/pb/dht.pb.go b/pb/dht.pb.go new file mode 100644 index 00000000000..4b8501180bb --- /dev/null +++ b/pb/dht.pb.go @@ -0,0 +1,272 @@ +// Code generated by protoc-gen-gogo. +// source: dht.proto +// DO NOT EDIT! + +/* +Package dht_pb is a generated protocol buffer package. + +It is generated from these files: + dht.proto + +It has these top-level messages: + Message + Record +*/ +package dht_pb + +import proto "github.com/gogo/protobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type Message_MessageType int32 + +const ( + Message_PUT_VALUE Message_MessageType = 0 + Message_GET_VALUE Message_MessageType = 1 + Message_ADD_PROVIDER Message_MessageType = 2 + Message_GET_PROVIDERS Message_MessageType = 3 + Message_FIND_NODE Message_MessageType = 4 + Message_PING Message_MessageType = 5 +) + +var Message_MessageType_name = map[int32]string{ + 0: "PUT_VALUE", + 1: "GET_VALUE", + 2: "ADD_PROVIDER", + 3: "GET_PROVIDERS", + 4: "FIND_NODE", + 5: "PING", +} +var Message_MessageType_value = map[string]int32{ + "PUT_VALUE": 0, + "GET_VALUE": 1, + "ADD_PROVIDER": 2, + "GET_PROVIDERS": 3, + "FIND_NODE": 4, + "PING": 5, +} + +func (x Message_MessageType) Enum() *Message_MessageType { + p := new(Message_MessageType) + *p = x + return p +} +func (x Message_MessageType) String() string { + return proto.EnumName(Message_MessageType_name, int32(x)) +} +func (x *Message_MessageType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Message_MessageType_value, data, "Message_MessageType") + if err != nil { + return err + } + *x = Message_MessageType(value) + return nil +} + +type Message_ConnectionType int32 + +const ( + // sender does not have a connection to peer, and no extra information (default) + Message_NOT_CONNECTED Message_ConnectionType = 0 + // sender has a live connection to peer + Message_CONNECTED Message_ConnectionType = 1 + // sender recently connected to peer + Message_CAN_CONNECT Message_ConnectionType = 2 + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + Message_CANNOT_CONNECT Message_ConnectionType = 3 +) + +var Message_ConnectionType_name = map[int32]string{ + 0: "NOT_CONNECTED", + 1: "CONNECTED", + 2: "CAN_CONNECT", + 3: "CANNOT_CONNECT", +} +var Message_ConnectionType_value = map[string]int32{ + "NOT_CONNECTED": 0, + "CONNECTED": 1, + "CAN_CONNECT": 2, + "CANNOT_CONNECT": 3, +} + +func (x Message_ConnectionType) Enum() *Message_ConnectionType { + p := new(Message_ConnectionType) + *p = x + return p +} +func (x Message_ConnectionType) String() string { + return proto.EnumName(Message_ConnectionType_name, int32(x)) +} +func (x *Message_ConnectionType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Message_ConnectionType_value, data, "Message_ConnectionType") + if err != nil { + return err + } + *x = Message_ConnectionType(value) + return nil +} + +type Message struct { + // defines what type of message it is. + Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.pb.Message_MessageType" json:"type,omitempty"` + // defines what coral cluster level this query/response belongs to. + ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"` + // Used to specify the key associated with this message. + // PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + // Used to return a value + // PUT_VALUE, GET_VALUE + Record *Record `protobuf:"bytes,3,opt,name=record" json:"record,omitempty"` + // Used to return peers closer to a key in a query + // GET_VALUE, GET_PROVIDERS, FIND_NODE + CloserPeers []*Message_Peer `protobuf:"bytes,8,rep,name=closerPeers" json:"closerPeers,omitempty"` + // Used to return Providers + // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + ProviderPeers []*Message_Peer `protobuf:"bytes,9,rep,name=providerPeers" json:"providerPeers,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} + +func (m *Message) GetType() Message_MessageType { + if m != nil && m.Type != nil { + return *m.Type + } + return Message_PUT_VALUE +} + +func (m *Message) GetClusterLevelRaw() int32 { + if m != nil && m.ClusterLevelRaw != nil { + return *m.ClusterLevelRaw + } + return 0 +} + +func (m *Message) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *Message) GetRecord() *Record { + if m != nil { + return m.Record + } + return nil +} + +func (m *Message) GetCloserPeers() []*Message_Peer { + if m != nil { + return m.CloserPeers + } + return nil +} + +func (m *Message) GetProviderPeers() []*Message_Peer { + if m != nil { + return m.ProviderPeers + } + return nil +} + +type Message_Peer struct { + // ID of a given peer. + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + // multiaddrs for a given peer + Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` + // used to signal the sender's connection capabilities to the peer + Connection *Message_ConnectionType `protobuf:"varint,3,opt,name=connection,enum=dht.pb.Message_ConnectionType" json:"connection,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message_Peer) Reset() { *m = Message_Peer{} } +func (m *Message_Peer) String() string { return proto.CompactTextString(m) } +func (*Message_Peer) ProtoMessage() {} + +func (m *Message_Peer) GetId() string { + if m != nil && m.Id != nil { + return *m.Id + } + return "" +} + +func (m *Message_Peer) GetAddrs() [][]byte { + if m != nil { + return m.Addrs + } + return nil +} + +func (m *Message_Peer) GetConnection() Message_ConnectionType { + if m != nil && m.Connection != nil { + return *m.Connection + } + return Message_NOT_CONNECTED +} + +// Record represents a dht record that contains a value +// for a key value pair +type Record struct { + // The key that references this record + Key *string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` + // The actual value this record is storing + Value []byte `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` + // hash of the authors public key + Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"` + // A PKI signature for the key+value+author + Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"` + // Time the record was received, set by receiver + TimeReceived *string `protobuf:"bytes,5,opt,name=timeReceived" json:"timeReceived,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Record) Reset() { *m = Record{} } +func (m *Record) String() string { return proto.CompactTextString(m) } +func (*Record) ProtoMessage() {} + +func (m *Record) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *Record) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Record) GetAuthor() string { + if m != nil && m.Author != nil { + return *m.Author + } + return "" +} + +func (m *Record) GetSignature() []byte { + if m != nil { + return m.Signature + } + return nil +} + +func (m *Record) GetTimeReceived() string { + if m != nil && m.TimeReceived != nil { + return *m.TimeReceived + } + return "" +} + +func init() { + proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) + proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) +} diff --git a/pb/dht.proto b/pb/dht.proto new file mode 100644 index 00000000000..de88c3451b8 --- /dev/null +++ b/pb/dht.proto @@ -0,0 +1,81 @@ +package dht.pb; + +//run `protoc --go_out=. *.proto` to generate + +message Message { + enum MessageType { + PUT_VALUE = 0; + GET_VALUE = 1; + ADD_PROVIDER = 2; + GET_PROVIDERS = 3; + FIND_NODE = 4; + PING = 5; + } + + enum ConnectionType { + // sender does not have a connection to peer, and no extra information (default) + NOT_CONNECTED = 0; + + // sender has a live connection to peer + CONNECTED = 1; + + // sender recently connected to peer + CAN_CONNECT = 2; + + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + CANNOT_CONNECT = 3; + } + + message Peer { + // ID of a given peer. + optional string id = 1; + + // multiaddrs for a given peer + repeated bytes addrs = 2; + + // used to signal the sender's connection capabilities to the peer + optional ConnectionType connection = 3; + } + + // defines what type of message it is. + optional MessageType type = 1; + + // defines what coral cluster level this query/response belongs to. + optional int32 clusterLevelRaw = 10; + + // Used to specify the key associated with this message. + // PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + optional string key = 2; + + // Used to return a value + // PUT_VALUE, GET_VALUE + optional Record record = 3; + + // Used to return peers closer to a key in a query + // GET_VALUE, GET_PROVIDERS, FIND_NODE + repeated Peer closerPeers = 8; + + // Used to return Providers + // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + repeated Peer providerPeers = 9; +} + +// Record represents a dht record that contains a value +// for a key value pair +message Record { + // The key that references this record + optional string key = 1; + + // The actual value this record is storing + optional bytes value = 2; + + // hash of the authors public key + optional string author = 3; + + // A PKI signature for the key+value+author + optional bytes signature = 4; + + // Time the record was received, set by receiver + optional string timeReceived = 5; +} diff --git a/pb/message.go b/pb/message.go new file mode 100644 index 00000000000..2f61c0573a5 --- /dev/null +++ b/pb/message.go @@ -0,0 +1,185 @@ +package dht_pb + +import ( + ma "github.com/jbenet/go-multiaddr" + + key "github.com/ipfs/go-ipfs/blocks/key" + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + inet "github.com/ipfs/go-libp2p/p2p/net" + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("dht.pb") + +type PeerRoutingInfo struct { + pstore.PeerInfo + inet.Connectedness +} + +// NewMessage constructs a new dht message with given type, key, and level +func NewMessage(typ Message_MessageType, key string, level int) *Message { + m := &Message{ + Type: &typ, + Key: &key, + } + m.SetClusterLevel(level) + return m +} + +func peerRoutingInfoToPBPeer(p PeerRoutingInfo) *Message_Peer { + pbp := new(Message_Peer) + + pbp.Addrs = make([][]byte, len(p.Addrs)) + for i, maddr := range p.Addrs { + pbp.Addrs[i] = maddr.Bytes() // Bytes, not String. Compressed. + } + s := string(p.ID) + pbp.Id = &s + c := ConnectionType(p.Connectedness) + pbp.Connection = &c + return pbp +} + +func peerInfoToPBPeer(p pstore.PeerInfo) *Message_Peer { + pbp := new(Message_Peer) + + pbp.Addrs = make([][]byte, len(p.Addrs)) + for i, maddr := range p.Addrs { + pbp.Addrs[i] = maddr.Bytes() // Bytes, not String. Compressed. + } + s := string(p.ID) + pbp.Id = &s + return pbp +} + +// PBPeerToPeer turns a *Message_Peer into its pstore.PeerInfo counterpart +func PBPeerToPeerInfo(pbp *Message_Peer) pstore.PeerInfo { + return pstore.PeerInfo{ + ID: peer.ID(pbp.GetId()), + Addrs: pbp.Addresses(), + } +} + +// RawPeerInfosToPBPeers converts a slice of Peers into a slice of *Message_Peers, +// ready to go out on the wire. +func RawPeerInfosToPBPeers(peers []pstore.PeerInfo) []*Message_Peer { + pbpeers := make([]*Message_Peer, len(peers)) + for i, p := range peers { + pbpeers[i] = peerInfoToPBPeer(p) + } + return pbpeers +} + +// PeersToPBPeers converts given []peer.Peer into a set of []*Message_Peer, +// which can be written to a message and sent out. the key thing this function +// does (in addition to PeersToPBPeers) is set the ConnectionType with +// information from the given inet.Network. +func PeerInfosToPBPeers(n inet.Network, peers []pstore.PeerInfo) []*Message_Peer { + pbps := RawPeerInfosToPBPeers(peers) + for i, pbp := range pbps { + c := ConnectionType(n.Connectedness(peers[i].ID)) + pbp.Connection = &c + } + return pbps +} + +func PeerRoutingInfosToPBPeers(peers []PeerRoutingInfo) []*Message_Peer { + pbpeers := make([]*Message_Peer, len(peers)) + for i, p := range peers { + pbpeers[i] = peerRoutingInfoToPBPeer(p) + } + return pbpeers +} + +// PBPeersToPeerInfos converts given []*Message_Peer into []pstore.PeerInfo +// Invalid addresses will be silently omitted. +func PBPeersToPeerInfos(pbps []*Message_Peer) []pstore.PeerInfo { + peers := make([]pstore.PeerInfo, 0, len(pbps)) + for _, pbp := range pbps { + peers = append(peers, PBPeerToPeerInfo(pbp)) + } + return peers +} + +// Addresses returns a multiaddr associated with the Message_Peer entry +func (m *Message_Peer) Addresses() []ma.Multiaddr { + if m == nil { + return nil + } + + maddrs := make([]ma.Multiaddr, 0, len(m.Addrs)) + for _, addr := range m.Addrs { + maddr, err := ma.NewMultiaddrBytes(addr) + if err != nil { + log.Warningf("error decoding Multiaddr for peer: %s", m.GetId()) + continue + } + + maddrs = append(maddrs, maddr) + } + return maddrs +} + +// GetClusterLevel gets and adjusts the cluster level on the message. +// a +/- 1 adjustment is needed to distinguish a valid first level (1) and +// default "no value" protobuf behavior (0) +func (m *Message) GetClusterLevel() int { + level := m.GetClusterLevelRaw() - 1 + if level < 0 { + return 0 + } + return int(level) +} + +// SetClusterLevel adjusts and sets the cluster level on the message. +// a +/- 1 adjustment is needed to distinguish a valid first level (1) and +// default "no value" protobuf behavior (0) +func (m *Message) SetClusterLevel(level int) { + lvl := int32(level) + m.ClusterLevelRaw = &lvl +} + +// Loggable turns a Message into machine-readable log output +func (m *Message) Loggable() map[string]interface{} { + return map[string]interface{}{ + "message": map[string]string{ + "type": m.Type.String(), + "key": key.Key(m.GetKey()).B58String(), + }, + } +} + +// ConnectionType returns a Message_ConnectionType associated with the +// inet.Connectedness. +func ConnectionType(c inet.Connectedness) Message_ConnectionType { + switch c { + default: + return Message_NOT_CONNECTED + case inet.NotConnected: + return Message_NOT_CONNECTED + case inet.Connected: + return Message_CONNECTED + case inet.CanConnect: + return Message_CAN_CONNECT + case inet.CannotConnect: + return Message_CANNOT_CONNECT + } +} + +// Connectedness returns an inet.Connectedness associated with the +// Message_ConnectionType. +func Connectedness(c Message_ConnectionType) inet.Connectedness { + switch c { + default: + return inet.NotConnected + case Message_NOT_CONNECTED: + return inet.NotConnected + case Message_CONNECTED: + return inet.Connected + case Message_CAN_CONNECT: + return inet.CanConnect + case Message_CANNOT_CONNECT: + return inet.CannotConnect + } +} diff --git a/pb/message_test.go b/pb/message_test.go new file mode 100644 index 00000000000..71f4abdc5ec --- /dev/null +++ b/pb/message_test.go @@ -0,0 +1,15 @@ +package dht_pb + +import ( + "testing" +) + +func TestBadAddrsDontReturnNil(t *testing.T) { + mp := new(Message_Peer) + mp.Addrs = [][]byte{[]byte("NOT A VALID MULTIADDR")} + + addrs := mp.Addresses() + if len(addrs) > 0 { + t.Fatal("shouldnt have any multiaddrs") + } +} diff --git a/providers/providers.go b/providers/providers.go new file mode 100644 index 00000000000..417a45424f7 --- /dev/null +++ b/providers/providers.go @@ -0,0 +1,353 @@ +package providers + +import ( + "encoding/binary" + "fmt" + "strings" + "time" + + lru "github.com/hashicorp/golang-lru" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + peer "github.com/ipfs/go-libp2p-peer" + logging "github.com/ipfs/go-log" + goprocess "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" + autobatch "github.com/whyrusleeping/autobatch" + base32 "github.com/whyrusleeping/base32" + + key "github.com/ipfs/go-ipfs/blocks/key" + flags "github.com/ipfs/go-ipfs/flags" + + context "golang.org/x/net/context" +) + +var batchBufferSize = 256 + +func init() { + if flags.LowMemMode { + batchBufferSize = 8 + } +} + +var log = logging.Logger("providers") + +var lruCacheSize = 256 +var ProvideValidity = time.Hour * 24 +var defaultCleanupInterval = time.Hour + +type ProviderManager struct { + // all non channel fields are meant to be accessed only within + // the run method + providers *lru.Cache + lpeer peer.ID + dstore ds.Datastore + + newprovs chan *addProv + getprovs chan *getProv + period time.Duration + proc goprocess.Process + + cleanupInterval time.Duration +} + +type providerSet struct { + providers []peer.ID + set map[peer.ID]time.Time +} + +type addProv struct { + k key.Key + val peer.ID +} + +type getProv struct { + k key.Key + resp chan []peer.ID +} + +func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager { + pm := new(ProviderManager) + pm.getprovs = make(chan *getProv) + pm.newprovs = make(chan *addProv) + pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) + cache, err := lru.New(lruCacheSize) + if err != nil { + panic(err) //only happens if negative value is passed to lru constructor + } + pm.providers = cache + + pm.proc = goprocessctx.WithContext(ctx) + pm.cleanupInterval = defaultCleanupInterval + pm.proc.Go(func(p goprocess.Process) { pm.run() }) + + return pm +} + +const providersKeyPrefix = "/providers/" + +func mkProvKey(k key.Key) ds.Key { + return ds.NewKey(providersKeyPrefix + base32.RawStdEncoding.EncodeToString([]byte(k))) +} + +func (pm *ProviderManager) Process() goprocess.Process { + return pm.proc +} + +func (pm *ProviderManager) providersForKey(k key.Key) ([]peer.ID, error) { + pset, err := pm.getProvSet(k) + if err != nil { + return nil, err + } + return pset.providers, nil +} + +func (pm *ProviderManager) getProvSet(k key.Key) (*providerSet, error) { + cached, ok := pm.providers.Get(k) + if ok { + return cached.(*providerSet), nil + } + + pset, err := loadProvSet(pm.dstore, k) + if err != nil { + return nil, err + } + + if len(pset.providers) > 0 { + pm.providers.Add(k, pset) + } + + return pset, nil +} + +func loadProvSet(dstore ds.Datastore, k key.Key) (*providerSet, error) { + res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k).String()}) + if err != nil { + return nil, err + } + + out := newProviderSet() + for e := range res.Next() { + if e.Error != nil { + log.Error("got an error: ", e.Error) + continue + } + parts := strings.Split(e.Key, "/") + if len(parts) != 4 { + log.Warning("incorrectly formatted key: ", e.Key) + continue + } + + decstr, err := base32.RawStdEncoding.DecodeString(parts[len(parts)-1]) + if err != nil { + log.Error("base32 decoding error: ", err) + continue + } + + pid := peer.ID(decstr) + + t, err := readTimeValue(e.Value) + if err != nil { + log.Warning("parsing providers record from disk: ", err) + continue + } + + out.setVal(pid, t) + } + + return out, nil +} + +func readTimeValue(i interface{}) (time.Time, error) { + data, ok := i.([]byte) + if !ok { + return time.Time{}, fmt.Errorf("data was not a []byte") + } + + nsec, _ := binary.Varint(data) + + return time.Unix(0, nsec), nil +} + +func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error { + iprovs, ok := pm.providers.Get(k) + if !ok { + iprovs = newProviderSet() + pm.providers.Add(k, iprovs) + } + provs := iprovs.(*providerSet) + now := time.Now() + provs.setVal(p, now) + + return writeProviderEntry(pm.dstore, k, p, now) +} + +func writeProviderEntry(dstore ds.Datastore, k key.Key, p peer.ID, t time.Time) error { + dsk := mkProvKey(k).ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) + + buf := make([]byte, 16) + n := binary.PutVarint(buf, t.UnixNano()) + + return dstore.Put(dsk, buf[:n]) +} + +func (pm *ProviderManager) deleteProvSet(k key.Key) error { + pm.providers.Remove(k) + + res, err := pm.dstore.Query(dsq.Query{ + KeysOnly: true, + Prefix: mkProvKey(k).String(), + }) + + entries, err := res.Rest() + if err != nil { + return err + } + + for _, e := range entries { + err := pm.dstore.Delete(ds.NewKey(e.Key)) + if err != nil { + log.Error("deleting provider set: ", err) + } + } + return nil +} + +func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) { + res, err := pm.dstore.Query(dsq.Query{ + KeysOnly: true, + Prefix: providersKeyPrefix, + }) + + if err != nil { + return nil, err + } + + entries, err := res.Rest() + if err != nil { + return nil, err + } + + out := make([]key.Key, 0, len(entries)) + seen := make(map[key.Key]struct{}) + for _, e := range entries { + parts := strings.Split(e.Key, "/") + if len(parts) != 4 { + log.Warning("incorrectly formatted provider entry in datastore") + continue + } + decoded, err := base32.RawStdEncoding.DecodeString(parts[2]) + if err != nil { + log.Warning("error decoding base32 provider key") + continue + } + + k := key.Key(decoded) + if _, ok := seen[k]; !ok { + out = append(out, key.Key(decoded)) + seen[k] = struct{}{} + } + } + + return out, nil +} + +func (pm *ProviderManager) run() { + tick := time.NewTicker(pm.cleanupInterval) + for { + select { + case np := <-pm.newprovs: + err := pm.addProv(np.k, np.val) + if err != nil { + log.Error("error adding new providers: ", err) + } + case gp := <-pm.getprovs: + provs, err := pm.providersForKey(gp.k) + if err != nil && err != ds.ErrNotFound { + log.Error("error reading providers: ", err) + } + + gp.resp <- provs + case <-tick.C: + keys, err := pm.getAllProvKeys() + if err != nil { + log.Error("Error loading provider keys: ", err) + continue + } + for _, k := range keys { + provs, err := pm.getProvSet(k) + if err != nil { + log.Error("error loading known provset: ", err) + continue + } + var filtered []peer.ID + for p, t := range provs.set { + if time.Now().Sub(t) > ProvideValidity { + delete(provs.set, p) + } else { + filtered = append(filtered, p) + } + } + + if len(filtered) > 0 { + provs.providers = filtered + } else { + err := pm.deleteProvSet(k) + if err != nil { + log.Error("error deleting provider set: ", err) + } + } + } + case <-pm.proc.Closing(): + return + } + } +} + +func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) { + prov := &addProv{ + k: k, + val: val, + } + select { + case pm.newprovs <- prov: + case <-ctx.Done(): + } +} + +func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID { + gp := &getProv{ + k: k, + resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking + } + select { + case <-ctx.Done(): + return nil + case pm.getprovs <- gp: + } + select { + case <-ctx.Done(): + return nil + case peers := <-gp.resp: + return peers + } +} + +func newProviderSet() *providerSet { + return &providerSet{ + set: make(map[peer.ID]time.Time), + } +} + +func (ps *providerSet) Add(p peer.ID) { + ps.setVal(p, time.Now()) +} + +func (ps *providerSet) setVal(p peer.ID, t time.Time) { + _, found := ps.set[p] + if !found { + ps.providers = append(ps.providers, p) + } + + ps.set[p] = t +} diff --git a/providers/providers_test.go b/providers/providers_test.go new file mode 100644 index 00000000000..b3d82bac0f7 --- /dev/null +++ b/providers/providers_test.go @@ -0,0 +1,150 @@ +package providers + +import ( + "fmt" + "testing" + "time" + + ds "github.com/ipfs/go-datastore" + key "github.com/ipfs/go-ipfs/blocks/key" + peer "github.com/ipfs/go-libp2p-peer" + + context "golang.org/x/net/context" +) + +func TestProviderManager(t *testing.T) { + ctx := context.Background() + mid := peer.ID("testing") + p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + a := key.Key("test") + p.AddProvider(ctx, a, peer.ID("testingprovider")) + resp := p.GetProviders(ctx, a) + if len(resp) != 1 { + t.Fatal("Could not retrieve provider.") + } + p.proc.Close() +} + +func TestProvidersDatastore(t *testing.T) { + old := lruCacheSize + lruCacheSize = 10 + defer func() { lruCacheSize = old }() + + ctx := context.Background() + mid := peer.ID("testing") + p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + defer p.proc.Close() + + friend := peer.ID("friend") + var keys []key.Key + for i := 0; i < 100; i++ { + k := key.Key(fmt.Sprint(i)) + keys = append(keys, k) + p.AddProvider(ctx, k, friend) + } + + for _, k := range keys { + resp := p.GetProviders(ctx, k) + if len(resp) != 1 { + t.Fatal("Could not retrieve provider.") + } + if resp[0] != friend { + t.Fatal("expected provider to be 'friend'") + } + } +} + +func TestProvidersSerialization(t *testing.T) { + dstore := ds.NewMapDatastore() + + k := key.Key("my key!") + p1 := peer.ID("peer one") + p2 := peer.ID("peer two") + pt1 := time.Now() + pt2 := pt1.Add(time.Hour) + + err := writeProviderEntry(dstore, k, p1, pt1) + if err != nil { + t.Fatal(err) + } + + err = writeProviderEntry(dstore, k, p2, pt2) + if err != nil { + t.Fatal(err) + } + + pset, err := loadProvSet(dstore, k) + if err != nil { + t.Fatal(err) + } + + lt1, ok := pset.set[p1] + if !ok { + t.Fatal("failed to load set correctly") + } + + if pt1 != lt1 { + t.Fatal("time wasnt serialized correctly") + } + + lt2, ok := pset.set[p2] + if !ok { + t.Fatal("failed to load set correctly") + } + + if pt2 != lt2 { + t.Fatal("time wasnt serialized correctly") + } +} + +func TestProvidesExpire(t *testing.T) { + pval := ProvideValidity + cleanup := defaultCleanupInterval + ProvideValidity = time.Second / 2 + defaultCleanupInterval = time.Second / 2 + defer func() { + ProvideValidity = pval + defaultCleanupInterval = cleanup + }() + + ctx := context.Background() + mid := peer.ID("testing") + p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) + + peers := []peer.ID{"a", "b"} + var keys []key.Key + for i := 0; i < 10; i++ { + k := key.Key(i) + keys = append(keys, k) + p.AddProvider(ctx, k, peers[0]) + p.AddProvider(ctx, k, peers[1]) + } + + for i := 0; i < 10; i++ { + out := p.GetProviders(ctx, keys[i]) + if len(out) != 2 { + t.Fatal("expected providers to still be there") + } + } + + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + out := p.GetProviders(ctx, keys[i]) + if len(out) > 2 { + t.Fatal("expected providers to be cleaned up") + } + } + + if p.providers.Len() != 0 { + t.Fatal("providers map not cleaned up") + } + + allprovs, err := p.getAllProvKeys() + if err != nil { + t.Fatal(err) + } + + if len(allprovs) != 0 { + t.Fatal("expected everything to be cleaned out of the datastore") + } +} diff --git a/query.go b/query.go new file mode 100644 index 00000000000..cce43143ada --- /dev/null +++ b/query.go @@ -0,0 +1,297 @@ +package dht + +import ( + "sync" + + key "github.com/ipfs/go-ipfs/blocks/key" + notif "github.com/ipfs/go-ipfs/notifications" + "github.com/ipfs/go-ipfs/routing" + pset "github.com/ipfs/go-ipfs/thirdparty/peerset" + todoctr "github.com/ipfs/go-ipfs/thirdparty/todocounter" + + u "github.com/ipfs/go-ipfs-util" + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + queue "github.com/ipfs/go-libp2p-peerstore/queue" + logging "github.com/ipfs/go-log" + process "github.com/jbenet/goprocess" + ctxproc "github.com/jbenet/goprocess/context" + context "golang.org/x/net/context" +) + +var maxQueryConcurrency = AlphaValue + +type dhtQuery struct { + dht *IpfsDHT + key key.Key // the key we're querying for + qfunc queryFunc // the function to execute per peer + concurrency int // the concurrency parameter +} + +type dhtQueryResult struct { + value []byte // GetValue + peer pstore.PeerInfo // FindPeer + providerPeers []pstore.PeerInfo // GetProviders + closerPeers []pstore.PeerInfo // * + success bool +} + +// constructs query +func (dht *IpfsDHT) newQuery(k key.Key, f queryFunc) *dhtQuery { + return &dhtQuery{ + key: k, + dht: dht, + qfunc: f, + concurrency: maxQueryConcurrency, + } +} + +// QueryFunc is a function that runs a particular query with a given peer. +// It returns either: +// - the value +// - a list of peers potentially better able to serve the query +// - an error +type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error) + +// Run runs the query at hand. pass in a list of peers to use first. +func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + runner := newQueryRunner(q) + return runner.Run(ctx, peers) +} + +type dhtQueryRunner struct { + query *dhtQuery // query to run + peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x + peersToQuery *queue.ChanQueue // peers remaining to be queried + peersRemaining todoctr.Counter // peersToQuery + currently processing + + result *dhtQueryResult // query result + errs u.MultiErr // result errors. maybe should be a map[peer.ID]error + + rateLimit chan struct{} // processing semaphore + log logging.EventLogger + + runCtx context.Context + + proc process.Process + sync.RWMutex +} + +func newQueryRunner(q *dhtQuery) *dhtQueryRunner { + proc := process.WithParent(process.Background()) + ctx := ctxproc.OnClosingContext(proc) + return &dhtQueryRunner{ + query: q, + peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))), + peersRemaining: todoctr.NewSyncCounter(), + peersSeen: pset.New(), + rateLimit: make(chan struct{}, q.concurrency), + proc: proc, + } +} + +func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) { + r.log = log + r.runCtx = ctx + + if len(peers) == 0 { + log.Warning("Running query with no peers!") + return nil, nil + } + + // setup concurrency rate limiting + for i := 0; i < r.query.concurrency; i++ { + r.rateLimit <- struct{}{} + } + + // add all the peers we got first. + for _, p := range peers { + r.addPeerToQuery(p) + } + + // go do this thing. + // do it as a child proc to make sure Run exits + // ONLY AFTER spawn workers has exited. + r.proc.Go(r.spawnWorkers) + + // so workers are working. + + // wait until they're done. + err := routing.ErrNotFound + + // now, if the context finishes, close the proc. + // we have to do it here because the logic before is setup, which + // should run without closing the proc. + ctxproc.CloseAfterContext(r.proc, ctx) + + select { + case <-r.peersRemaining.Done(): + r.proc.Close() + r.RLock() + defer r.RUnlock() + + err = routing.ErrNotFound + + // if every query to every peer failed, something must be very wrong. + if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() { + log.Debugf("query errs: %s", r.errs) + err = r.errs[0] + } + + case <-r.proc.Closed(): + r.RLock() + defer r.RUnlock() + err = context.DeadlineExceeded + } + + if r.result != nil && r.result.success { + return r.result, nil + } + + return nil, err +} + +func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) { + // if new peer is ourselves... + if next == r.query.dht.self { + r.log.Debug("addPeerToQuery skip self") + return + } + + if !r.peersSeen.TryAdd(next) { + return + } + + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.AddingPeer, + ID: next, + }) + + r.peersRemaining.Increment(1) + select { + case r.peersToQuery.EnqChan <- next: + case <-r.proc.Closing(): + } +} + +func (r *dhtQueryRunner) spawnWorkers(proc process.Process) { + for { + + select { + case <-r.peersRemaining.Done(): + return + + case <-r.proc.Closing(): + return + + case <-r.rateLimit: + select { + case p, more := <-r.peersToQuery.DeqChan: + if !more { + return // channel closed. + } + + // do it as a child func to make sure Run exits + // ONLY AFTER spawn workers has exited. + proc.Go(func(proc process.Process) { + r.queryPeer(proc, p) + }) + case <-r.proc.Closing(): + return + case <-r.peersRemaining.Done(): + return + } + } + } +} + +func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { + // ok let's do this! + + // create a context from our proc. + ctx := ctxproc.OnClosingContext(proc) + + // make sure we do this when we exit + defer func() { + // signal we're done proccessing peer p + r.peersRemaining.Decrement(1) + r.rateLimit <- struct{}{} + }() + + // make sure we're connected to the peer. + // FIXME abstract away into the network layer + if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { + log.Debug("not connected. dialing.") + + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.DialingPeer, + ID: p, + }) + // while we dial, we do not take up a rate limit. this is to allow + // forward progress during potentially very high latency dials. + r.rateLimit <- struct{}{} + + pi := pstore.PeerInfo{ID: p} + + if err := r.query.dht.host.Connect(ctx, pi); err != nil { + log.Debugf("Error connecting: %s", err) + + notif.PublishQueryEvent(r.runCtx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + ID: p, + }) + + r.Lock() + r.errs = append(r.errs, err) + r.Unlock() + <-r.rateLimit // need to grab it again, as we deferred. + return + } + <-r.rateLimit // need to grab it again, as we deferred. + log.Debugf("connected. dial success.") + } + + // finally, run the query against this peer + res, err := r.query.qfunc(ctx, p) + + if err != nil { + log.Debugf("ERROR worker for: %v %v", p, err) + r.Lock() + r.errs = append(r.errs, err) + r.Unlock() + + } else if res.success { + log.Debugf("SUCCESS worker for: %v %s", p, res) + r.Lock() + r.result = res + r.Unlock() + go r.proc.Close() // signal to everyone that we're done. + // must be async, as we're one of the children, and Close blocks. + + } else if len(res.closerPeers) > 0 { + log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers)) + for _, next := range res.closerPeers { + if next.ID == r.query.dht.self { // dont add self. + log.Debugf("PEERS CLOSER -- worker for: %v found self", p) + continue + } + + // add their addresses to the dialer's peerstore + r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL) + r.addPeerToQuery(next.ID) + log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) + } + } else { + log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p) + } +} diff --git a/records.go b/records.go new file mode 100644 index 00000000000..22389930686 --- /dev/null +++ b/records.go @@ -0,0 +1,149 @@ +package dht + +import ( + "fmt" + "time" + + routing "github.com/ipfs/go-ipfs/routing" + pb "github.com/ipfs/go-ipfs/routing/dht/pb" + record "github.com/ipfs/go-ipfs/routing/record" + ci "github.com/ipfs/go-libp2p-crypto" + peer "github.com/ipfs/go-libp2p-peer" + ctxfrac "github.com/jbenet/go-context/frac" + "golang.org/x/net/context" +) + +// MaxRecordAge specifies the maximum time that any node will hold onto a record +// from the time its received. This does not apply to any other forms of validity that +// the record may contain. +// For example, a record may contain an ipns entry with an EOL saying its valid +// until the year 2020 (a great time in the future). For that record to stick around +// it must be rebroadcasted more frequently than once every 'MaxRecordAge' +const MaxRecordAge = time.Hour * 36 + +func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { + log.Debugf("getPublicKey for: %s", p) + + // check locally. + pk := dht.peerstore.PubKey(p) + if pk != nil { + return pk, nil + } + + // ok, try the node itself. if they're overwhelmed or slow we can move on. + ctxT, cancelFunc := ctxfrac.WithDeadlineFraction(ctx, 0.3) + defer cancelFunc() + if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil { + err := dht.peerstore.AddPubKey(p, pk) + if err != nil { + return pk, err + } + return pk, nil + } + + // last ditch effort: let's try the dht. + log.Debugf("pk for %s not in peerstore, and peer failed. Trying DHT.", p) + pkkey := routing.KeyForPublicKey(p) + + val, err := dht.GetValue(ctxT, pkkey) + if err != nil { + log.Warning("Failed to find requested public key.") + return nil, err + } + + pk, err = ci.UnmarshalPublicKey(val) + if err != nil { + log.Debugf("Failed to unmarshal public key: %s", err) + return nil, err + } + + return pk, dht.peerstore.AddPubKey(p, pk) +} + +func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) { + + // check locally, just in case... + pk := dht.peerstore.PubKey(p) + if pk != nil { + return pk, nil + } + + pkkey := routing.KeyForPublicKey(p) + pmes, err := dht.getValueSingle(ctx, p, pkkey) + if err != nil { + return nil, err + } + + // node doesn't have key :( + record := pmes.GetRecord() + if record == nil { + return nil, fmt.Errorf("Node not responding with its public key: %s", p) + } + + // Success! We were given the value. we don't need to check + // validity because a) we can't. b) we know the hash of the + // key we're looking for. + val := record.GetValue() + log.Debug("DHT got a value from other peer") + + pk, err = ci.UnmarshalPublicKey(val) + if err != nil { + return nil, err + } + + id, err := peer.IDFromPublicKey(pk) + if err != nil { + return nil, err + } + if id != p { + return nil, fmt.Errorf("public key does not match id: %s", p) + } + + // ok! it's valid. we got it! + log.Debugf("DHT got public key from node itself.") + return pk, nil +} + +// verifyRecordLocally attempts to verify a record. if we do not have the public +// key, we fail. we do not search the dht. +func (dht *IpfsDHT) verifyRecordLocally(r *pb.Record) error { + + if len(r.Signature) > 0 { + // First, validate the signature + p := peer.ID(r.GetAuthor()) + pk := dht.peerstore.PubKey(p) + if pk == nil { + return fmt.Errorf("do not have public key for %s", p) + } + + if err := record.CheckRecordSig(r, pk); err != nil { + return err + } + } + + return dht.Validator.VerifyRecord(r) +} + +// verifyRecordOnline verifies a record, searching the DHT for the public key +// if necessary. The reason there is a distinction in the functions is that +// retrieving arbitrary public keys from the DHT as a result of passively +// receiving records (e.g. through a PUT_VALUE or ADD_PROVIDER) can cause a +// massive amplification attack on the dht. Use with care. +func (dht *IpfsDHT) verifyRecordOnline(ctx context.Context, r *pb.Record) error { + + if len(r.Signature) > 0 { + // get the public key, search for it if necessary. + p := peer.ID(r.GetAuthor()) + pk, err := dht.GetPublicKey(ctx, p) + if err != nil { + return err + } + + err = record.CheckRecordSig(r, pk) + if err != nil { + return err + } + } + + return dht.Validator.VerifyRecord(r) +} diff --git a/routing.go b/routing.go new file mode 100644 index 00000000000..cf87227b082 --- /dev/null +++ b/routing.go @@ -0,0 +1,532 @@ +package dht + +import ( + "bytes" + "fmt" + "runtime" + "sync" + "time" + + key "github.com/ipfs/go-ipfs/blocks/key" + notif "github.com/ipfs/go-ipfs/notifications" + "github.com/ipfs/go-ipfs/routing" + pb "github.com/ipfs/go-ipfs/routing/dht/pb" + kb "github.com/ipfs/go-ipfs/routing/kbucket" + record "github.com/ipfs/go-ipfs/routing/record" + pset "github.com/ipfs/go-ipfs/thirdparty/peerset" + + peer "github.com/ipfs/go-libp2p-peer" + pstore "github.com/ipfs/go-libp2p-peerstore" + inet "github.com/ipfs/go-libp2p/p2p/net" + context "golang.org/x/net/context" +) + +// asyncQueryBuffer is the size of buffered channels in async queries. This +// buffer allows multiple queries to execute simultaneously, return their +// results and continue querying closer peers. Note that different query +// results will wait for the channel to drain. +var asyncQueryBuffer = 10 + +// This file implements the Routing interface for the IpfsDHT struct. + +// Basic Put/Get + +// PutValue adds value corresponding to given Key. +// This is the top level "Store" operation of the DHT +func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) error { + log.Debugf("PutValue %s", key) + sk, err := dht.getOwnPrivateKey() + if err != nil { + return err + } + + sign, err := dht.Validator.IsSigned(key) + if err != nil { + return err + } + + rec, err := record.MakePutRecord(sk, key, value, sign) + if err != nil { + log.Debug("creation of record failed!") + return err + } + + err = dht.putLocal(key, rec) + if err != nil { + return err + } + + pchan, err := dht.GetClosestPeers(ctx, key) + if err != nil { + return err + } + + wg := sync.WaitGroup{} + for p := range pchan { + wg.Add(1) + go func(p peer.ID) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer wg.Done() + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.Value, + ID: p, + }) + + err := dht.putValueToPeer(ctx, p, key, rec) + if err != nil { + log.Debugf("failed putting value to peer: %s", err) + } + }(p) + } + wg.Wait() + return nil +} + +// GetValue searches for the value corresponding to given Key. +func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + vals, err := dht.GetValues(ctx, key, 16) + if err != nil { + return nil, err + } + + var recs [][]byte + for _, v := range vals { + if v.Val != nil { + recs = append(recs, v.Val) + } + } + + i, err := dht.Selector.BestRecord(key, recs) + if err != nil { + return nil, err + } + + best := recs[i] + log.Debugf("GetValue %v %v", key, best) + if best == nil { + log.Errorf("GetValue yielded correct record with nil value.") + return nil, routing.ErrNotFound + } + + fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true) + if err != nil { + // probably shouldnt actually 'error' here as we have found a value we like, + // but this call failing probably isnt something we want to ignore + return nil, err + } + + for _, v := range vals { + // if someone sent us a different 'less-valid' record, lets correct them + if !bytes.Equal(v.Val, best) { + go func(v routing.RecvdVal) { + if v.From == dht.self { + err := dht.putLocal(key, fixupRec) + if err != nil { + log.Error("Error correcting local dht entry:", err) + } + return + } + ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30) + defer cancel() + err := dht.putValueToPeer(ctx, v.From, key, fixupRec) + if err != nil { + log.Error("Error correcting DHT entry: ", err) + } + }(v) + } + } + + return best, nil +} + +func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) { + var vals []routing.RecvdVal + var valslock sync.Mutex + + // If we have it local, dont bother doing an RPC! + lrec, err := dht.getLocal(key) + if err == nil { + // TODO: this is tricky, we dont always want to trust our own value + // what if the authoritative source updated it? + log.Debug("have it locally") + vals = append(vals, routing.RecvdVal{ + Val: lrec.GetValue(), + From: dht.self, + }) + + if nvals <= 1 { + return vals, nil + } + } else if nvals == 0 { + return nil, err + } + + // get closest peers in the routing table + rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) + log.Debugf("peers in rt: %s", len(rtp), rtp) + if len(rtp) == 0 { + log.Warning("No peers from routing table!") + return nil, kb.ErrLookupFailure + } + + // setup the Query + parent := ctx + query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) + + rec, peers, err := dht.getValueOrPeers(ctx, p, key) + switch err { + case routing.ErrNotFound: + // in this case, they responded with nothing, + // still send a notification so listeners can know the + // request has completed 'successfully' + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.PeerResponse, + ID: p, + }) + return nil, err + default: + return nil, err + + case nil, errInvalidRecord: + // in either of these cases, we want to keep going + } + + res := &dhtQueryResult{closerPeers: peers} + + if rec.GetValue() != nil || err == errInvalidRecord { + rv := routing.RecvdVal{ + Val: rec.GetValue(), + From: p, + } + valslock.Lock() + vals = append(vals, rv) + + // If weve collected enough records, we're done + if len(vals) >= nvals { + res.success = true + } + valslock.Unlock() + } + + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.PeerResponse, + ID: p, + Responses: pointerizePeerInfos(peers), + }) + + return res, nil + }) + + // run it! + _, err = query.Run(ctx, rtp) + if len(vals) == 0 { + if err != nil { + return nil, err + } + } + + return vals, nil + +} + +// Value provider layer of indirection. +// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. + +// Provide makes this node announce that it can provide a value for the given key +func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { + defer log.EventBegin(ctx, "provide", &key).Done() + + // add self locally + dht.providers.AddProvider(ctx, key, dht.self) + + peers, err := dht.GetClosestPeers(ctx, key) + if err != nil { + return err + } + + mes, err := dht.makeProvRecord(key) + if err != nil { + return err + } + + wg := sync.WaitGroup{} + for p := range peers { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + log.Debugf("putProvider(%s, %s)", key, p) + err := dht.sendMessage(ctx, p, mes) + if err != nil { + log.Debug(err) + } + }(p) + } + wg.Wait() + return nil +} +func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) { + pi := pstore.PeerInfo{ + ID: dht.self, + Addrs: dht.host.Addrs(), + } + + // // only share WAN-friendly addresses ?? + // pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs) + if len(pi.Addrs) < 1 { + return nil, fmt.Errorf("no known addresses for self. cannot put provider.") + } + + pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(skey), 0) + pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi}) + return pmes, nil +} + +// FindProviders searches until the context expires. +func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]pstore.PeerInfo, error) { + var providers []pstore.PeerInfo + for p := range dht.FindProvidersAsync(ctx, key, KValue) { + providers = append(providers, p) + } + return providers, nil +} + +// FindProvidersAsync is the same thing as FindProviders, but returns a channel. +// Peers will be returned on the channel as soon as they are found, even before +// the search query completes. +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key key.Key, count int) <-chan pstore.PeerInfo { + log.Event(ctx, "findProviders", &key) + peerOut := make(chan pstore.PeerInfo, count) + go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) + return peerOut +} + +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, count int, peerOut chan pstore.PeerInfo) { + defer log.EventBegin(ctx, "findProvidersAsync", &key).Done() + defer close(peerOut) + + ps := pset.NewLimited(count) + provs := dht.providers.GetProviders(ctx, key) + for _, p := range provs { + // NOTE: Assuming that this list of peers is unique + if ps.TryAdd(p) { + select { + case peerOut <- dht.peerstore.PeerInfo(p): + case <-ctx.Done(): + return + } + } + + // If we have enough peers locally, dont bother with remote RPC + if ps.Size() >= count { + return + } + } + + // setup the Query + parent := ctx + query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) + pmes, err := dht.findProvidersSingle(ctx, p, key) + if err != nil { + return nil, err + } + + log.Debugf("%d provider entries", len(pmes.GetProviderPeers())) + provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) + log.Debugf("%d provider entries decoded", len(provs)) + + // Add unique providers from request, up to 'count' + for _, prov := range provs { + log.Debugf("got provider: %s", prov) + if ps.TryAdd(prov.ID) { + log.Debugf("using provider: %s", prov) + select { + case peerOut <- prov: + case <-ctx.Done(): + log.Debug("context timed out sending more providers") + return nil, ctx.Err() + } + } + if ps.Size() >= count { + log.Debugf("got enough providers (%d/%d)", ps.Size(), count) + return &dhtQueryResult{success: true}, nil + } + } + + // Give closer peers back to the query to be queried + closer := pmes.GetCloserPeers() + clpeers := pb.PBPeersToPeerInfos(closer) + log.Debugf("got closer peers: %d %s", len(clpeers), clpeers) + + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.PeerResponse, + ID: p, + Responses: pointerizePeerInfos(clpeers), + }) + return &dhtQueryResult{closerPeers: clpeers}, nil + }) + + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) + _, err := query.Run(ctx, peers) + if err != nil { + log.Debugf("Query error: %s", err) + // Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032 + if fmt.Sprint(err) == "" { + log.Error("reproduced bug 3032:") + log.Errorf("Errors type information: %#v", err) + log.Errorf("go version: %s", runtime.Version()) + log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032") + + // replace problematic error with something that won't crash the daemon + err = fmt.Errorf("") + } + notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ + Type: notif.QueryError, + Extra: err.Error(), + }) + } +} + +// FindPeer searches for a peer with given ID. +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) { + defer log.EventBegin(ctx, "FindPeer", id).Done() + + // Check if were already connected to them + if pi := dht.FindLocal(id); pi.ID != "" { + return pi, nil + } + + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue) + if len(peers) == 0 { + return pstore.PeerInfo{}, kb.ErrLookupFailure + } + + // Sanity... + for _, p := range peers { + if p == id { + log.Debug("found target peer in list of closest peers...") + return dht.peerstore.PeerInfo(p), nil + } + } + + // setup the Query + parent := ctx + query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.SendingQuery, + ID: p, + }) + + pmes, err := dht.findPeerSingle(ctx, p, id) + if err != nil { + return nil, err + } + + closer := pmes.GetCloserPeers() + clpeerInfos := pb.PBPeersToPeerInfos(closer) + + // see it we got the peer here + for _, npi := range clpeerInfos { + if npi.ID == id { + return &dhtQueryResult{ + peer: npi, + success: true, + }, nil + } + } + + notif.PublishQueryEvent(parent, ¬if.QueryEvent{ + Type: notif.PeerResponse, + Responses: pointerizePeerInfos(clpeerInfos), + }) + + return &dhtQueryResult{closerPeers: clpeerInfos}, nil + }) + + // run it! + result, err := query.Run(ctx, peers) + if err != nil { + return pstore.PeerInfo{}, err + } + + log.Debugf("FindPeer %v %v", id, result.success) + if result.peer.ID == "" { + return pstore.PeerInfo{}, routing.ErrNotFound + } + + return result.peer, nil +} + +// FindPeersConnectedToPeer searches for peers directly connected to a given peer. +func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan pstore.PeerInfo, error) { + + peerchan := make(chan pstore.PeerInfo, asyncQueryBuffer) + peersSeen := make(map[peer.ID]struct{}) + + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue) + if len(peers) == 0 { + return nil, kb.ErrLookupFailure + } + + // setup the Query + query := dht.newQuery(key.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + + pmes, err := dht.findPeerSingle(ctx, p, id) + if err != nil { + return nil, err + } + + var clpeers []pstore.PeerInfo + closer := pmes.GetCloserPeers() + for _, pbp := range closer { + pi := pb.PBPeerToPeerInfo(pbp) + + // skip peers already seen + if _, found := peersSeen[pi.ID]; found { + continue + } + peersSeen[pi.ID] = struct{}{} + + // if peer is connected, send it to our client. + if pb.Connectedness(*pbp.Connection) == inet.Connected { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case peerchan <- pi: + } + } + + // if peer is the peer we're looking for, don't bother querying it. + // TODO maybe query it? + if pb.Connectedness(*pbp.Connection) != inet.Connected { + clpeers = append(clpeers, pi) + } + } + + return &dhtQueryResult{closerPeers: clpeers}, nil + }) + + // run it! run it asynchronously to gen peers as results are found. + // this does no error checking + go func() { + if _, err := query.Run(ctx, peers); err != nil { + log.Debug(err) + } + + // close the peerchan channel when done. + close(peerchan) + }() + + return peerchan, nil +} diff --git a/util.go b/util.go new file mode 100644 index 00000000000..a605759a955 --- /dev/null +++ b/util.go @@ -0,0 +1,39 @@ +package dht + +import ( + "sync" +) + +// Pool size is the number of nodes used for group find/set RPC calls +var PoolSize = 6 + +// K is the maximum number of requests to perform before returning failure. +var KValue = 20 + +// Alpha is the concurrency factor for asynchronous requests. +var AlphaValue = 3 + +// A counter for incrementing a variable across multiple threads +type counter struct { + n int + mut sync.Mutex +} + +func (c *counter) Increment() { + c.mut.Lock() + c.n++ + c.mut.Unlock() +} + +func (c *counter) Decrement() { + c.mut.Lock() + c.n-- + c.mut.Unlock() +} + +func (c *counter) Size() (s int) { + c.mut.Lock() + s = c.n + c.mut.Unlock() + return +}