Skip to content

Commit

Permalink
rapide: fix termination logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jan 31, 2023
1 parent 59302e9 commit 6539fe9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
29 changes: 28 additions & 1 deletion rapide/rapide.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rapide
import (
"context"
"fmt"
"io"
"sync"

"github.com/ipfs/go-cid"
Expand All @@ -28,6 +29,7 @@ func (c *Client) Get(ctx context.Context, root cid.Cid, traversal ipsl.Traversal
out := make(chan blocks.BlockOrError)
d := &download{
out: out,
ctx: ctx,
cancel: cancel,
root: node{
state: todo,
Expand All @@ -45,20 +47,39 @@ func (c *Client) Get(ctx context.Context, root cid.Cid, traversal ipsl.Traversal

type download struct {
out chan<- blocks.BlockOrError
ctx context.Context
cancel context.CancelFunc
root node
closeOnce sync.Once
}

// err cuts out the download and make it return an error, this is intended for unrecoverable errors.
func (d *download) err(err error) {
d.closeOnce.Do(func() {
select {
case d.out <- blocks.IsNot(err):
case <-d.ctx.Done():
}
d.cancel()
close(d.out)
})
}

func (d *download) finish() {
d.closeOnce.Do(func() {
d.cancel()
d.out <- blocks.IsNot(err)
close(d.out)
})
}

func (d *download) workerFinished() {
d.root.mu.Lock()
defer d.root.mu.Unlock()
if d.root.state == done && len(d.root.childrens) == 0 {
d.finish() // file was downloaded !
}
}

type node struct {
// parent is not protected by the mutex and is readonly after creation
parent *node
Expand Down Expand Up @@ -99,6 +120,12 @@ func (n *node) expand(d *download, b blocks.Block) error {
n.childrens = childrens

for node, parent := n, n.parent; len(node.childrens) == 0; node, parent = parent, parent.parent {
if parent == nil {
// finished!
d.finish()
return io.EOF
}

// nothing to do, backtrack
parent.mu.Lock()
for i, v := range parent.childrens {
Expand Down
25 changes: 15 additions & 10 deletions rapide/serverdriven.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func (d *download) startServerDrivenWorker(ctx context.Context, impl ServerDrive
}

func (w *serverDrivenWorker) work(ctx context.Context) {
defer w.resetCurrentNodesWorkState()
defer w.download.workerFinished()
defer w.resetCurrentChildsNodeWorkState()

workLoop:
for {
Expand All @@ -59,7 +60,7 @@ workLoop:
for {
if len(tasks) == 0 {
cancelCurrentRequest()
w.resetCurrentNodesWorkState()
w.resetCurrentChildsNodeWorkState()
continue workLoop
}
b, err := stream.Next()
Expand All @@ -70,7 +71,7 @@ workLoop:
return // request canceled
case err == io.EOF:
cancelCurrentRequest()
w.resetCurrentNodesWorkState()
w.resetCurrentChildsNodeWorkState()
continue workLoop
default:
// FIXME: support ignoring erroring parts of the tree when searching (dontGoThere)
Expand Down Expand Up @@ -99,13 +100,13 @@ workLoop:
task.mu.Unlock()
// we finished all parts of our tree, cancel current work and restart a new request.
cancelCurrentRequest()
w.resetCurrentNodesWorkState()
w.resetCurrentChildsNodeWorkState()
continue workLoop
}
if err := task.expand(w.download, b); err != nil {
task.mu.Unlock()
cancelCurrentRequest()
w.resetCurrentNodesWorkState()
w.resetCurrentChildsNodeWorkState()
return
}

Expand Down Expand Up @@ -210,16 +211,20 @@ func (w *serverDrivenWorker) findWork() (cid.Cid, ipsl.Traversal, bool) {
}
}

// resetCurrentNodesWorkState updates the state of the current node to longer count towards it.
func (w *serverDrivenWorker) resetCurrentNodesWorkState() {
// resetCurrentChildsNodeWorkState updates the state of the current node to longer count towards it.
func (w *serverDrivenWorker) resetCurrentChildsNodeWorkState() {
c := w.current
if c == nil {
return // nothing to do
}
w.current = nil

// recursively walk the state and remove ourself from counters
w.recurseCancelNode(c)
// This is pretty contensius but that should be fine because server driven downloads should be cancel rarely, also most of thoses are gonna go on the fast path anyway. c.mu.Lock()
c.mu.Lock()
defer c.mu.Unlock()
for _, child := range c.childrens {
w.recurseCancelNode(child)
}
}

func (w *serverDrivenWorker) recurseCancelNode(c *node) {
Expand All @@ -229,11 +234,11 @@ func (w *serverDrivenWorker) recurseCancelNode(c *node) {

// This is pretty contensius but that should be fine because server driven downloads should be cancel rarely, also most of thoses are gonna go on the fast path anyway. c.mu.Lock()
c.mu.Lock()
defer c.mu.Unlock()
c.workers -= 1
for _, child := range c.childrens {
w.recurseCancelNode(child)
}
defer c.mu.Unlock()
}

func (w *serverDrivenWorker) isOurTask(c *node) bool {
Expand Down

0 comments on commit 6539fe9

Please sign in to comment.