Skip to content

Commit

Permalink
bitswap: extract findProviders to providers
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed Nov 30, 2017
1 parent 1ad5ae1 commit 630ceb2
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 198 deletions.
6 changes: 3 additions & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,12 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
n.PeerHost = rhost.Wrap(host, n.Routing)

// Wrap content routing with a buffering layer
n.Providers = provider.NewProviders(ctx, n.Routing)
n.Providers = provider.NewProviders(ctx, n.Routing, n.PeerHost)

// setup exchange service
const alwaysSendToPeer = true // use YesManStrategy
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing, n.Providers)
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, n.Providers, alwaysSendToPeer)

size, err := n.getCacheSize()
if err != nil {
Expand Down
45 changes: 14 additions & 31 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,13 @@ import (
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"

"github.com/ipfs/go-ipfs/providers"
)

var log = logging.Logger("bitswap")

const (
// maxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
// results.
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
maxProvidersPerRequest = 3
providerRequestTimeout = time.Second * 10
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
)
Expand All @@ -53,7 +48,7 @@ var rebroadcastDelay = delay.Fixed(time.Minute)
// delegate.
// Runs until context is cancelled.
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore, nice bool) exchange.Interface {
bstore blockstore.Blockstore, providers providers.Interface, nice bool) exchange.Interface {

// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
Expand All @@ -77,10 +72,10 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,

bs := &Bitswap{
blockstore: bstore,
providers: providers,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
wm: NewWantManager(ctx, network),
counters: new(counters),
Expand Down Expand Up @@ -121,13 +116,13 @@ type Bitswap struct {
// NB: ensure threadsafety
blockstore blockstore.Blockstore

// providers implement content routing logic
providers providers.Interface

// notifications engine for receiving new blocks and routing them to the
// appropriate user requests
notifications notifications.PubSub

// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest

process process.Process

// Counters for various statistics
Expand Down Expand Up @@ -156,11 +151,6 @@ type counters struct {
messagesRecvd uint64
}

type blockRequest struct {
Cid *cid.Cid
Ctx context.Context
}

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
Expand Down Expand Up @@ -208,14 +198,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block

bs.wm.WantBlocks(ctx, keys, nil, mses)

// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &blockRequest{
Cid: keys[0],
Ctx: ctx,
}

remaining := cid.NewSet()
for _, k := range keys {
remaining.Add(k)
Expand Down Expand Up @@ -250,12 +232,13 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
}
}()

select {
case bs.findKeys <- req:
return out, nil
case <-ctx.Done():
return nil, ctx.Err()
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
if err := bs.providers.FindProviders(ctx, keys[0]); err != nil {
return nil, err
}
return out, nil
}

func (bs *Bitswap) getNextSessionID() uint64 {
Expand Down Expand Up @@ -367,7 +350,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
if err := bs.receiveBlockFrom(b, p); err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
}
if err := bs.network.Provide(ctx, b.Cid()); err != nil {
if err := bs.providers.Provide(b.Cid()); err != nil {
log.Warningf("ReceiveMessage Provide error: %s", err)
}
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
Expand Down
11 changes: 0 additions & 11 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
ifconnmgr "gx/ipfs/QmWfkNorhirGE1Qp3VwBWcnGaj4adv4hNqCYwabMrEYc21/go-libp2p-interface-connmgr"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
Expand Down Expand Up @@ -37,8 +36,6 @@ type BitSwapNetwork interface {
NewMessageSender(context.Context, peer.ID) (MessageSender, error)

ConnectionManager() ifconnmgr.ConnManager

Routing
}

type MessageSender interface {
Expand All @@ -60,11 +57,3 @@ type Receiver interface {
PeerConnected(peer.ID)
PeerDisconnected(peer.ID)
}

type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID

// Provide provides the key to the network
Provide(context.Context, *cid.Cid) error
}
56 changes: 5 additions & 51 deletions exchange/bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"time"

bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
provider "github.com/ipfs/go-ipfs/providers"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
Expand All @@ -26,11 +24,10 @@ var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, p provider.Interface) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
bitswapNetwork := impl{
host: host,
routing: r,
provider: p,
host: host,
routing: r,
}
host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
host.SetStreamHandler(ProtocolBitswapOne, bitswapNetwork.handleNewStream)
Expand All @@ -44,9 +41,8 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting, p provider.Interf
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
type impl struct {
host host.Host
routing routing.ContentRouting
provider provider.Interface
host host.Host
routing routing.ContentRouting

// inbound messages from the network are forwarded to the receiver
receiver Receiver
Expand Down Expand Up @@ -139,48 +135,6 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
return bsnet.host.Connect(ctx, pstore.PeerInfo{ID: p})
}

// FindProvidersAsync returns a channel of providers for the given key
// TODO: move this and other FindProvider stuff out to exch.provider
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {

// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
// precisely tracking which peers provide certain keys. This optimization
// would be misleading. In the long run, this may not be the most
// appropriate place for this optimization, but it won't cause any harm in
// the short term.
connectedPeers := bsnet.host.Network().Peers()
out := make(chan peer.ID, len(connectedPeers)) // just enough buffer for these connectedPeers
for _, id := range connectedPeers {
if id == bsnet.host.ID() {
continue // ignore self as provider
}
out <- id
}

go func() {
defer close(out)
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
if info.ID == bsnet.host.ID() {
continue // ignore self as provider
}
bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, pstore.TempAddrTTL)
select {
case <-ctx.Done():
return
case out <- info.ID:
}
}
}()
return out
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error {
return bsnet.provider.Provide(k)
}

// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) {
defer s.Close()
Expand Down
4 changes: 3 additions & 1 deletion exchange/bitswap/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,12 @@ func (s *Session) run(ctx context.Context) {
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) {
for p := range s.bs.providers.FindProvidersAsync(ctx, k, 10) {
newpeers <- p
}

}(live[0])

}
s.resetTick()
case p := <-newpeers:
Expand Down
5 changes: 4 additions & 1 deletion exchange/bitswap/testnet/interface.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package bitswap

import (
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
"gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"

bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
"github.com/ipfs/go-ipfs/providers"
)

type Network interface {
Adapter(testutil.Identity) bsnet.BitSwapNetwork
Providers() providers.Interface

HasPeer(peer.ID) bool
}
19 changes: 13 additions & 6 deletions exchange/bitswap/testnet/peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@ package bitswap
import (
"context"

bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
pr "github.com/ipfs/go-ipfs/providers"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
mockpeernet "gx/ipfs/QmTzs3Gp2rU3HuNayjBVG7qBgbaKWE8bgtwJ7faRxAe9UP/go-libp2p/p2p/net/mock"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"

bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
pr "github.com/ipfs/go-ipfs/providers"
providers "github.com/ipfs/go-ipfs/providers"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
)

type peernet struct {
mockpeernet.Mocknet
routingserver mockrouting.Server
providers providers.Interface
}

func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) {
return &peernet{net, rs}, nil
return &peernet{net, rs, nil}, nil
}

func (pn *peernet) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
Expand All @@ -27,9 +30,9 @@ func (pn *peernet) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
panic(err.Error())
}
routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore())
provider := pr.NewProviders(context.TODO(), routing)
pn.providers = pr.NewProviders(context.TODO(), routing, client)

return bsnet.NewFromIpfsHost(client, routing, provider)
return bsnet.NewFromIpfsHost(client, routing)
}

func (pn *peernet) HasPeer(p peer.ID) bool {
Expand All @@ -41,4 +44,8 @@ func (pn *peernet) HasPeer(p peer.ID) bool {
return false
}

func (pn *peernet) Providers() providers.Interface {
return pn.providers
}

var _ Network = (*peernet)(nil)
35 changes: 7 additions & 28 deletions exchange/bitswap/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
ifconnmgr "gx/ipfs/QmWfkNorhirGE1Qp3VwBWcnGaj4adv4hNqCYwabMrEYc21/go-libp2p-interface-connmgr"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"

"github.com/ipfs/go-ipfs/exchange/offline"
"github.com/ipfs/go-ipfs/providers"
)

var log = logging.Logger("bstestnet")
Expand Down Expand Up @@ -50,6 +52,10 @@ func (n *network) HasPeer(p peer.ID) bool {
return found
}

func (n *network) Providers() providers.Interface {
return offline.Providers() //TODO: probably need to pass regular/mock providers here
}

// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
Expand Down Expand Up @@ -97,28 +103,6 @@ func (nc *networkClient) SendMessage(
return nc.network.SendMessage(ctx, nc.local, to, message)
}

// FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {

// NB: this function duplicates the PeerInfo -> ID transformation in the
// bitswap network adapter. Not to worry. This network client will be
// deprecated once the ipfsnet.Mock is added. The code below is only
// temporary.

out := make(chan peer.ID)
go func() {
defer close(out)
providers := nc.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
select {
case <-ctx.Done():
case out <- info.ID:
}
}
}()
return out
}

func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager {
return &ifconnmgr.NullConnMgr{}
}
Expand Down Expand Up @@ -151,11 +135,6 @@ func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.
}, nil
}

// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k *cid.Cid) error {
return nc.routing.Provide(ctx, k, true)
}

func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}
Expand Down
Loading

0 comments on commit 630ceb2

Please sign in to comment.