Skip to content

Commit

Permalink
node/engine: do not skip objects if shards are busy
Browse files Browse the repository at this point in the history
If every shard's pool is overloaded with routines, choose the best one and try
to PUT an object to it 30 seconds. Closes #2871.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Oct 7, 2024
1 parent c8942f9 commit 03f1f87
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pkg/local_object_storage/engine/evacuate.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ mainLoop:
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()})
if putDone || exists {
if putDone {
exists, err := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()})
if err == nil {
if !exists {
e.log.Debug("object is moved to another shard",
zap.String("from", sidList[n]),
zap.Stringer("to", shards[j].ID()),
Expand Down
76 changes: 62 additions & 14 deletions pkg/local_object_storage/engine/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package engine

import (
"errors"
"fmt"
"time"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -76,8 +79,14 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
}

finished := false
var bestShard hashedShard
var bestPool util.WorkerPool

e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
if ind == 0 {
bestShard = sh
}

e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
e.mtx.RUnlock()
Expand All @@ -86,28 +95,61 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
return false
}

putDone, exists := e.putToShard(sh, ind, pool, addr, prm)
finished = putDone || exists
exists, err := e.putToShard(sh, ind, pool, addr, prm)
finished = err == nil || exists
return finished
})

if !finished {
err = errPutShard
err = e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, prm)
if err != nil {
e.log.Warn("last stand to put object to the best shard",
zap.Stringer("addr", addr),
zap.Stringer("shard", bestShard.ID()),
zap.Error(err))

Check warning on line 109 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L104-L109

Added lines #L104 - L109 were not covered by tests

return PutRes{}, errPutShard

Check warning on line 111 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L111

Added line #L111 was not covered by tests
}
}

return PutRes{}, err
return PutRes{}, nil
}

func (e *StorageEngine) putToShardWithDeadLine(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) error {
const deadline = 30 * time.Second
timer := time.NewTimer(deadline)
defer timer.Stop()

Check warning on line 121 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L118-L121

Added lines #L118 - L121 were not covered by tests

const putCooldown = 100 * time.Millisecond
ticker := time.NewTicker(putCooldown)
defer ticker.Stop()

Check warning on line 125 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L123-L125

Added lines #L123 - L125 were not covered by tests

for {
select {
case <-timer.C:
return fmt.Errorf("could not put object within %s", deadline)
case <-ticker.C:
_, err := e.putToShard(sh, ind, pool, addr, prm)
if errors.Is(err, ants.ErrPoolOverload) {
ticker.Reset(putCooldown)
continue

Check warning on line 135 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L127-L135

Added lines #L127 - L135 were not covered by tests
}

return err

Check warning on line 138 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L138

Added line #L138 was not covered by tests
}
}
}

// putToShard puts object to sh.
// First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) (bool, bool) {
var putSuccess, alreadyExists bool
// Return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) (bool, error) {
var alreadyExists bool
var errGlobal error
id := sh.ID()

exitCh := make(chan struct{})

if err := pool.Submit(func() {
err := pool.Submit(func() {
defer close(exitCh)

var existPrm shard.ExistsPrm
Expand All @@ -124,8 +166,11 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool
// object is already found but
// expired => do nothing with it
alreadyExists = true
return

Check warning on line 169 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L169

Added line #L169 was not covered by tests
}

errGlobal = err

Check warning on line 172 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L172

Added line #L172 was not covered by tests

return // this is not ErrAlreadyRemoved error so we can go to the next shard
}

Expand Down Expand Up @@ -159,6 +204,8 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool

_, err = sh.Put(putPrm)
if err != nil {
errGlobal = err

if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn("could not put object to shard",
Expand All @@ -167,19 +214,20 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool
return
}

e.reportShardError(sh, "could not put object to shard", err)
e.reportShardError(sh, "could not put object to shard", errGlobal)

Check warning on line 217 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L217

Added line #L217 was not covered by tests
return
}

putSuccess = true
}); err != nil {
})
if err != nil {
e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err))

Check warning on line 222 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L222

Added line #L222 was not covered by tests
close(exitCh)

return false, err

Check warning on line 225 in pkg/local_object_storage/engine/put.go

View check run for this annotation

Codecov / codecov/patch

pkg/local_object_storage/engine/put.go#L225

Added line #L225 was not covered by tests
}

<-exitCh

return putSuccess, alreadyExists
return alreadyExists, errGlobal
}

// Put writes provided object to local storage.
Expand Down

0 comments on commit 03f1f87

Please sign in to comment.