Skip to content

Commit

Permalink
added support for an experimental batched provider system
Browse files Browse the repository at this point in the history
The batched provider system is enabled when the experimental AcceleratedDHTClient is enabled

There is also an `ipfs stats provide` command which gives stats about the providing/reproviding system when the batched provider system is enabled
  • Loading branch information
aschmahmann committed May 13, 2021
1 parent 3556ad2 commit dbdee7f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 9 deletions.
1 change: 1 addition & 0 deletions core/commands/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ for your IPFS node.`,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
},
}

Expand Down
51 changes: 51 additions & 0 deletions core/commands/stat_provide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package commands

import (
"fmt"

cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"

"github.com/ipfs/go-ipfs-provider/batched"
)

var statProvideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's (re)provider system.",
ShortDescription: `
Returns statistics about the content the node is advertising.
This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if !nd.IsOnline {
return ErrNotOnline
}

sys, ok := nd.Provider.(*batched.BatchProvidingSystem)
if !ok {
return fmt.Errorf("can only return stats if the experimental DHT client is enabled")
}

stats, err := sys.Stat(req.Context)
if err != nil {
return err
}

if err := res.Emit(stats); err != nil {
return err
}

return nil
},
Encoders: cmds.EncoderMap{},
Type: batched.BatchedProviderStats{},
}
4 changes: 2 additions & 2 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(p2p.New),

LibP2P(bcfg, cfg),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

Expand All @@ -286,7 +286,7 @@ func Offline(cfg *config.Config) fx.Option {
fx.Provide(DNSResolver),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

Expand Down
60 changes: 56 additions & 4 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (

"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-provider"
"github.com/ipfs/go-ipfs-provider/batched"
q "github.com/ipfs/go-ipfs-provider/queue"
"github.com/ipfs/go-ipfs-provider/simple"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/repo"
)

Expand Down Expand Up @@ -59,29 +62,78 @@ func SimpleProviderSys(isOnline bool) interface{} {
}
}

type provideMany interface {
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
Ready() bool
}

// BatchedProviderSys creates new provider system
func BatchedProviderSys(isOnline bool, reprovideInterval string) interface{} {
return func(lc fx.Lifecycle, cr libp2p.BaseIpfsRouting, q *q.Queue, keyProvider simple.KeyChanFunc, repo repo.Repo) (provider.System, error) {
r, ok := (cr).(provideMany)
if !ok {
return nil, fmt.Errorf("BatchedProviderSys requires a content router that supports provideMany")
}

reproviderInterval := kReprovideFrequency
if reprovideInterval != "" {
dur, err := time.ParseDuration(reprovideInterval)
if err != nil {
return nil, err
}

reproviderInterval = dur
}

sys, err := batched.New(r, q,
batched.ReproviderInterval(reproviderInterval),
batched.Datastore(repo.Datastore()),
batched.KeyProvider(keyProvider))
if err != nil {
return nil, err
}

if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}

return sys, nil
}
}

// ONLINE/OFFLINE

// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
func OnlineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}

return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
fx.Provide(SimpleProviderSys(true)),
maybeProvide(SimpleProviderSys(true), !useBatchedProviding),
maybeProvide(BatchedProviderSys(true, reprovideInterval), useBatchedProviding),
)
}

// OfflineProviders groups units managing provider routing records offline
func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
func OfflineProviders(useStrategicProviding bool, useBatchedProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}

return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
fx.Provide(SimpleProviderSys(false)),
maybeProvide(SimpleProviderSys(false), true),
//maybeProvide(BatchedProviderSys(false, reprovideInterval), useBatchedProviding),
)
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/ipfs/go-ipfs-keystore v0.0.2
github.com/ipfs/go-ipfs-pinner v0.1.1
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.4.3
github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqt
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-provider v0.4.3 h1:k54OHXZcFBkhL6l3GnPS9PfpaLeLqZjVASG1bgfBdfQ=
github.com/ipfs/go-ipfs-provider v0.4.3/go.mod h1:rcQBVqfblDQRk5LaCtf2uxuKxMJxvKmF5pLS0pO4au4=
github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024 h1:eYfdZ27ogtwfnwKdfphOwcQ7PEOjKqXlWzVOakK0a60=
github.com/ipfs/go-ipfs-provider v0.4.4-0.20210513014626-1c19caa05024/go.mod h1:kUMTf1R8c+KgWUWKTGSZiXCDZWMCkxCX3wyepk0cYEA=
github.com/ipfs/go-ipfs-routing v0.0.1/go.mod h1:k76lf20iKFxQTjcJokbPM9iBXVXVZhcOwc360N4nuKs=
github.com/ipfs/go-ipfs-routing v0.1.0 h1:gAJTT1cEeeLj6/DlLX6t+NxD9fQe2ymTO6qWRDI/HQQ=
github.com/ipfs/go-ipfs-routing v0.1.0/go.mod h1:hYoUkJLyAUKhF58tysKpids8RNDPO42BVMgK5dNsoqY=
Expand Down

0 comments on commit dbdee7f

Please sign in to comment.