Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify logic for MFS remote pinning #10506

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/ipfs/kubo/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ take effect.
prometheus.MustRegister(&corehttp.IpfsNodeCollector{Node: node})

// start MFS pinning thread
startPinMFS(daemonConfigPollInterval, cctx, &ipfsPinMFSNode{node})
startPinMFS(cctx, daemonConfigPollInterval, &ipfsPinMFSNode{node})

// The daemon is *finally* ready.
fmt.Printf("Daemon is ready\n")
Expand Down
167 changes: 65 additions & 102 deletions cmd/ipfs/kubo/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
pinclient "github.com/ipfs/boxo/pinning/remote/client"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"

config "github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core"
Expand Down Expand Up @@ -40,6 +40,7 @@ func init() {
d, err := time.ParseDuration(pollDurStr)
if err != nil {
mfslog.Error("error parsing MFS_PIN_POLL_INTERVAL, using default:", err)
return
}
daemonConfigPollInterval = d
}
Expand Down Expand Up @@ -74,87 +75,58 @@ func (x *ipfsPinMFSNode) PeerHost() host.Host {
return x.node.PeerHost
}

func startPinMFS(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode) {
errCh := make(chan error)
go pinMFSOnChange(configPollInterval, cctx, node, errCh)
go func() {
for {
select {
case err, isOpen := <-errCh:
if !isOpen {
return
}
mfslog.Errorf("%v", err)
case <-cctx.Context().Done():
return
}
}
}()
func startPinMFS(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) {
go pinMFSOnChange(cctx, configPollInterval, node)
}

func pinMFSOnChange(configPollInterval time.Duration, cctx pinMFSContext, node pinMFSNode, errCh chan<- error) {
defer close(errCh)

var tmo *time.Timer
defer func() {
if tmo != nil {
tmo.Stop()
}
}()
func pinMFSOnChange(cctx pinMFSContext, configPollInterval time.Duration, node pinMFSNode) {
tmo := time.NewTimer(configPollInterval)
defer tmo.Stop()

lastPins := map[string]lastPin{}
for {
// polling sleep
if tmo == nil {
tmo = time.NewTimer(configPollInterval)
} else {
tmo.Reset(configPollInterval)
}
select {
case <-cctx.Context().Done():
return
case <-tmo.C:
tmo.Reset(configPollInterval)
}

// reread the config, which may have changed in the meantime
cfg, err := cctx.GetConfig()
if err != nil {
select {
case errCh <- fmt.Errorf("pinning reading config (%v)", err):
case <-cctx.Context().Done():
return
}
mfslog.Errorf("pinning reading config (%v)", err)
continue
}
mfslog.Debugf("pinning loop is awake, %d remote services", len(cfg.Pinning.RemoteServices))

// get the most recent MFS root cid
rootNode, err := node.RootNode()
if err != nil {
select {
case errCh <- fmt.Errorf("pinning reading MFS root (%v)", err):
case <-cctx.Context().Done():
return
}
mfslog.Errorf("pinning reading MFS root (%v)", err)
continue
}
rootCid := rootNode.Cid()

// pin to all remote services in parallel
pinAllMFS(cctx.Context(), node, cfg, rootCid, lastPins, errCh)
pinAllMFS(cctx.Context(), node, cfg, rootNode.Cid(), lastPins)
}
}

// pinAllMFS pins on all remote services in parallel to overcome DoS attacks.
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin, errCh chan<- error) {
ch := make(chan lastPin, len(cfg.Pinning.RemoteServices))
for svcName_, svcConfig_ := range cfg.Pinning.RemoteServices {
func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid cid.Cid, lastPins map[string]lastPin) {
ch := make(chan lastPin)
var started int

for svcName, svcConfig := range cfg.Pinning.RemoteServices {
if ctx.Err() != nil {
break
}

// skip services where MFS is not enabled
svcName, svcConfig := svcName_, svcConfig_
mfslog.Debugf("pinning MFS root considering service %q", svcName)
if !svcConfig.Policies.MFS.Enable {
mfslog.Debugf("pinning service %q is not enabled", svcName)
ch <- lastPin{}
continue
}
// read mfs pin interval for this service
Expand All @@ -165,11 +137,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
var err error
repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval)
if err != nil {
select {
case errCh <- fmt.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err):
case <-ctx.Done():
}
ch <- lastPin{}
mfslog.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err)
continue
}
}
Expand All @@ -182,38 +150,30 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
} else {
mfslog.Debugf("pinning MFS root to %q: skipped due to MFS.RepinInterval=%s (remaining: %s)", svcName, repinInterval.String(), (repinInterval - time.Since(last.Time)).String())
}
ch <- lastPin{}
continue
}
}

mfslog.Debugf("pinning MFS root %q to %q", rootCid, svcName)
go func() {
if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig); err != nil {
select {
case errCh <- fmt.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err):
case <-ctx.Done():
}
ch <- lastPin{}
} else {
ch <- r
go func(svcName string, svcConfig config.RemotePinningService) {
r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig)
if err != nil {
mfslog.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err)
}
}()
ch <- r
}(svcName, svcConfig)
started++
}
for i := 0; i < len(cfg.Pinning.RemoteServices); i++ {

// Collect results from all started goroutines.
for i := 0; i < started; i++ {
if x := <-ch; x.IsValid() {
lastPins[x.ServiceName] = x
}
}
}

func pinMFS(
ctx context.Context,
node pinMFSNode,
cid cid.Cid,
svcName string,
svcConfig config.RemotePinningService,
) (lastPin, error) {
func pinMFS(ctx context.Context, node pinMFSNode, cid cid.Cid, svcName string, svcConfig config.RemotePinningService) (lastPin, error) {
c := pinclient.NewClient(svcConfig.API.Endpoint, svcConfig.API.Key)

pinName := svcConfig.Policies.MFS.PinName
Expand Down Expand Up @@ -243,43 +203,46 @@ func pinMFS(
}
for range lsPinCh { // in case the prior loop exits early
}
if err := <-lsErrCh; err != nil {
err := <-lsErrCh
if err != nil {
return lastPin{}, fmt.Errorf("error while listing remote pins: %v", err)
}

// CID of the current MFS root is already being pinned, nothing to do
if pinning {
mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String())
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil
}

// Prepare Pin.name
addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)}
if !pinning {
// Prepare Pin.name
addOpts := []pinclient.AddOption{pinclient.PinOpts.WithName(pinName)}

// Prepare Pin.origins
// Add own multiaddrs to the 'origins' array, so Pinning Service can
// use that as a hint and connect back to us (if possible)
if node.PeerHost() != nil {
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost()))
if err != nil {
return lastPin{}, err
// Prepare Pin.origins
// Add own multiaddrs to the 'origins' array, so Pinning Service can
// use that as a hint and connect back to us (if possible)
if node.PeerHost() != nil {
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(node.PeerHost()))
if err != nil {
return lastPin{}, err
}
addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...))
}
addOpts = append(addOpts, pinclient.PinOpts.WithOrigins(addrs...))
}

// Create or replace pin for MFS root
if existingRequestID != "" {
mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid)
_, err := c.Replace(ctx, existingRequestID, cid, addOpts...)
if err != nil {
return lastPin{}, err
// Create or replace pin for MFS root
if existingRequestID != "" {
mfslog.Debugf("pinning to %q: replacing existing MFS root pin with %q", svcName, cid)
if _, err = c.Replace(ctx, existingRequestID, cid, addOpts...); err != nil {
return lastPin{}, err
}
} else {
mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid)
if _, err = c.Add(ctx, cid, addOpts...); err != nil {
return lastPin{}, err
}
}
} else {
mfslog.Debugf("pinning to %q: creating a new MFS root pin for %q", svcName, cid)
_, err := c.Add(ctx, cid, addOpts...)
if err != nil {
return lastPin{}, err
}
mfslog.Debugf("pinning MFS to %q: pin for %q exists since %s, skipping", svcName, cid, pinTime.String())
}
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil

return lastPin{
Time: pinTime,
ServiceName: svcName,
ServiceConfig: svcConfig,
CID: cid,
}, nil
}
Loading
Loading