Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: Do not ignore+detach shards with failed Open/Init ops #2464

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Changelog for NeoFS Node
- Skip unexpected notary events on notary request parsing step (#2315)
- Session inactivity on object PUT request relay (#2460)
- Missing connection retries on IR node startup when the first configured mainnet RPC node is not in sync (#2474)
- Storage node no longer ignores unhealthy shards on startup (#2464)

### Removed
- Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400)
Expand Down
10 changes: 2 additions & 8 deletions pkg/local_object_storage/blobstor/control.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blobstor

import (
"errors"
"fmt"

"go.uber.org/zap"
Expand All @@ -14,20 +13,15 @@ func (b *BlobStor) Open(readOnly bool) error {
for i := range b.storage {
err := b.storage[i].Storage.Open(readOnly)
if err != nil {
return err
return fmt.Errorf("open substorage %s: %w", b.storage[i].Storage.Type(), err)
}
}
return nil
}

// ErrInitBlobovniczas is returned when blobovnicza initialization fails.
var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stage")

// Init initializes internal data structures and system resources.
//
// If BlobStor is already initialized, no action is taken.
//
// Returns wrapped ErrInitBlobovniczas on blobovnicza tree's initializaiton failure.
func (b *BlobStor) Init() error {
b.log.Debug("initializing...")

Expand All @@ -38,7 +32,7 @@ func (b *BlobStor) Init() error {
for i := range b.storage {
err := b.storage[i].Storage.Init()
if err != nil {
return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err)
return fmt.Errorf("init substorage %s: %w", b.storage[i].Storage.Type(), err)
}
}
return nil
Expand Down
86 changes: 4 additions & 82 deletions pkg/local_object_storage/engine/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@ import (
"fmt"
"path/filepath"
"strings"
"sync"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"go.uber.org/zap"
)

type shardInitError struct {
err error
id string
}

// Open opens all StorageEngine's components.
func (e *StorageEngine) Open() error {
return e.open()
Expand All @@ -26,41 +19,9 @@ func (e *StorageEngine) open() error {
e.mtx.Lock()
defer e.mtx.Unlock()

var wg sync.WaitGroup
var errCh = make(chan shardInitError, len(e.shards))
carpawell marked this conversation as resolved.
Show resolved Hide resolved

for id, sh := range e.shards {
wg.Add(1)
go func(id string, sh *shard.Shard) {
defer wg.Done()
if err := sh.Open(); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
}(id, sh.Shard)
}
wg.Wait()
close(errCh)

for res := range errCh {
if res.err != nil {
e.log.Error("could not open shard, closing and skipping",
zap.String("id", res.id),
zap.Error(res.err))

sh := e.shards[res.id]
delete(e.shards, res.id)

err := sh.Close()
if err != nil {
e.log.Error("could not close partially initialized shard",
zap.String("id", res.id),
zap.Error(res.err))
}

continue
if err := sh.Open(); err != nil {
return fmt.Errorf("open shard %s: %w", id, err)
}
}

Expand All @@ -72,51 +33,12 @@ func (e *StorageEngine) Init() error {
e.mtx.Lock()
defer e.mtx.Unlock()

var wg sync.WaitGroup
var errCh = make(chan shardInitError, len(e.shards))

for id, sh := range e.shards {
wg.Add(1)
go func(id string, sh *shard.Shard) {
defer wg.Done()
if err := sh.Init(); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
}(id, sh.Shard)
}
wg.Wait()
close(errCh)

for res := range errCh {
if res.err != nil {
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
e.log.Error("could not initialize shard, closing and skipping",
zap.String("id", res.id),
zap.Error(res.err))

sh := e.shards[res.id]
delete(e.shards, res.id)

err := sh.Close()
if err != nil {
e.log.Error("could not close partially initialized shard",
zap.String("id", res.id),
zap.Error(res.err))
}

continue
}
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
if err := sh.Init(); err != nil {
return fmt.Errorf("init shard %s: %w", id, err)
}
}

if len(e.shards) == 0 {
return errors.New("failed initialization on all shards")
}

e.wg.Add(1)
go e.setModeLoop()

Expand Down
7 changes: 1 addition & 6 deletions pkg/local_object_storage/engine/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,13 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [
}
}

e.mtx.RLock()
shardCount := len(e.shards)
e.mtx.RUnlock()
require.Equal(t, 0, shardCount)

require.NoError(t, os.Chmod(badDir, os.ModePerm))
require.NoError(t, e.Reload(ReConfiguration{
shards: map[string][]shard.Option{configID: s},
}))

e.mtx.RLock()
shardCount = len(e.shards)
shardCount := len(e.shards)
e.mtx.RUnlock()
require.Equal(t, 1, shardCount)
}
Expand Down
Loading