Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next generation of GC #4149

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ order to reclaim hard disk space.

streamErrors, _, _ := res.Request().Option("stream-errors").Bool()

gcOutChan := corerepo.GarbageCollectAsync(n, req.Context())
gcOutChan, err := corerepo.GarbageCollectAsync(req.Context(), n)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

outChan := make(chan interface{}, cap(gcOutChan))
res.SetOutput((<-chan interface{})(outChan))
Expand Down
29 changes: 17 additions & 12 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,13 @@ func BestEffortRoots(filesRoot *mfs.Root) ([]*cid.Cid, error) {
return []*cid.Cid{rootDag.Cid()}, nil
}

func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
func GarbageCollect(ctx context.Context, n *core.IpfsNode) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // in case error occurs during operation
roots, err := BestEffortRoots(n.FilesRoot)
defer cancel()
rmed, err := GarbageCollectAsync(ctx, n)
if err != nil {
return err
}
rmed := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)

return CollectResult(ctx, rmed, nil)
}
Expand Down Expand Up @@ -145,16 +144,22 @@ func (e *MultiError) Error() string {
return buf.String()
}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) <-chan gc.Result {
roots, err := BestEffortRoots(n.FilesRoot)
func GarbageCollectAsync(ctx context.Context, n *core.IpfsNode) (<-chan gc.Result, error) {
g, err := gc.NewGC(n.Blockstore, n.DAG)
if err != nil {
out := make(chan gc.Result)
out <- gc.Result{Error: err}
close(out)
return out
return nil, err
}

err = g.AddPinSource(n.Pinning.PinSources()...)
if err != nil {
return nil, err
}
err = g.AddPinSource(*n.FilesRoot.PinSource())
if err != nil {
return nil, err
}

return gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
return g.Run(ctx), nil
}

func PeriodicGC(ctx context.Context, node *core.IpfsNode) error {
Expand Down Expand Up @@ -217,7 +222,7 @@ func (gc *GC) maybeGC(ctx context.Context, offset uint64) error {
log.Info("Watermark exceeded. Starting repo GC...")
defer log.EventBegin(ctx, "repoGC").Done()

if err := GarbageCollect(gc.Node, ctx); err != nil {
if err := GarbageCollect(ctx, gc.Node); err != nil {
return err
}
log.Infof("Repo GC done. See `ipfs repo stat` to see how much space got freed.\n")
Expand Down
23 changes: 17 additions & 6 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/ipfs/go-ipfs/repo/config"
ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2"
pi "github.com/ipfs/go-ipfs/thirdparty/posinfo"
"gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
)

func TestAddRecursive(t *testing.T) {
Expand All @@ -46,6 +46,9 @@ func TestAddRecursive(t *testing.T) {
}

func TestAddGCLive(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
Expand All @@ -54,13 +57,13 @@ func TestAddGCLive(t *testing.T) {
},
D: ds2.ThreadSafeCloserMapDatastore(),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}

out := make(chan interface{})
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
adder, err := NewAdder(ctx, node.Pinning, node.Blockstore, node.DAG)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -98,11 +101,18 @@ func TestAddGCLive(t *testing.T) {
t.Fatal("add shouldnt complete yet")
}

g, err := gc.NewGC(node.Blockstore, node.DAG)
if err != nil {
t.Fatal(err)
}
g.AddPinSource(node.Pinning.PinSources()...)

var gcout <-chan gc.Result
gcstarted := make(chan struct{})

go func() {
defer close(gcstarted)
gcout = gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
gcout = g.Run(ctx)
}()

// gc shouldnt start until we let the add finish its current file.
Expand All @@ -126,6 +136,7 @@ func TestAddGCLive(t *testing.T) {
<-gcstarted

for r := range gcout {
t.Logf("gc res: %v", r)
if r.Error != nil {
t.Fatal(err)
}
Expand All @@ -144,11 +155,11 @@ func TestAddGCLive(t *testing.T) {
last = c
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx2, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG.GetLinks, last, set.Visit)
err = dag.EnumerateChildren(ctx2, node.DAG.GetLinks, last, set.Visit)
if err != nil {
t.Fatal(err)
}
Expand Down
19 changes: 19 additions & 0 deletions mfs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
Expand Down Expand Up @@ -123,6 +124,24 @@ func (kr *Root) Flush() error {
return nil
}

// PinSource returns information about pinning requirements.
func (kr *Root) PinSource() *pin.Source {
return &pin.Source{
Get: func() ([]*cid.Cid, error) {
err := kr.Flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this won't trigger a recursive GC. We take a lock, right? However, this can cause us to go over the GC limit even more. We should probably document the fact that the "max repo size" isn't really the max.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also get in a bad state here where we can't flush (no disk space) and can't GC (can't flush). Not sure if we can do something about it now but...

Copy link
Member Author

@Kubuxu Kubuxu Oct 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't much we can do here to recoverm I think, I need the actual state of the tree to perform the GC without data loss and I can't get it unless I flush the tree so it propagates the DAG updates.

if err != nil {
return nil, err
}

nd, err := kr.GetValue().GetNode()
if err != nil {
return nil, err
}
return []*cid.Cid{nd.Cid()}, nil
},
}
}

// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd node.Node, sync bool) error {
Expand Down
Loading