Skip to content

Commit

Permalink
O_TMPFILE (nspcc-dev#2566)
Browse files Browse the repository at this point in the history
Use a little more appropriate way to generate tmp files.
  • Loading branch information
roman-khimov authored Sep 19, 2023
2 parents 366342d + 59250aa commit bb9256f
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 241 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changelog for NeoFS Node

### Fixed

### Changed
- FSTree storage now uses more efficient and safe temporary files under Linux (#2566)

### Removed

### Updated
Expand Down
139 changes: 139 additions & 0 deletions pkg/local_object_storage/blobstor/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package blobstor_test

import (
"crypto/rand"
"fmt"
"path/filepath"
"sync"
"testing"
"time"

bbczt "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func benchmarkPutMN(b *testing.B, depth, width uint64, parallel bool) {
nBlobovniczas := uint64(1)
for i := uint64(1); i <= depth+1; i++ {
nBlobovniczas *= width
}

const objSizeLimit = 4 << 10
const fullSizeLimit = 100 << 20

bbcz := bbczt.NewBlobovniczaTree(
bbczt.WithRootPath(b.TempDir()),
bbczt.WithObjectSizeLimit(objSizeLimit),
bbczt.WithBlobovniczaSize(fullSizeLimit/nBlobovniczas),
bbczt.WithBlobovniczaShallowWidth(width),
bbczt.WithBlobovniczaShallowDepth(depth),
)

require.NoError(b, bbcz.Open(false))
b.Cleanup(func() { _ = bbcz.Close() })
require.NoError(b, bbcz.Init())

benchmark(b, bbcz, objSizeLimit, 20)
}

func BenchmarkBlobovniczas_Put(b *testing.B) {
for _, testCase := range []struct {
width, depth uint64
}{
{1, 0},
{10, 0},
{2, 2},
{4, 4},
} {
b.Run(fmt.Sprintf("tree=%dx%d", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, false)
})
b.Run(fmt.Sprintf("tree=%dx%d_parallel", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, true)
})
}
}

func testPeapodPath(tb testing.TB) string {
return filepath.Join(tb.TempDir(), "peapod.db")
}

func newTestPeapod(tb testing.TB) common.Storage {
return peapod.New(testPeapodPath(tb), 0600, 10*time.Millisecond)
}

func newTestFSTree(tb testing.TB) common.Storage {
return fstree.New(
fstree.WithDepth(4), // Default.
fstree.WithPath(tb.TempDir()),
fstree.WithDirNameLen(1), // Default.
fstree.WithNoSync(false), // Default.
)
}

func benchmark(b *testing.B, p common.Storage, objSize uint64, nThreads int) {
data := make([]byte, objSize)
rand.Read(data)

prm := common.PutPrm{
RawData: data,
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
var wg sync.WaitGroup

for i := 0; i < nThreads; i++ {
wg.Add(1)
go func() {
defer wg.Done()

prm := prm
prm.Address = oidtest.Address()

_, err := p.Put(prm)
require.NoError(b, err)
}()
}

wg.Wait()
}
}

func BenchmarkPut(b *testing.B) {
for _, tc := range []struct {
objSize uint64
nThreads int
}{
{1, 1},
{1, 20},
{1, 100},
{1 << 10, 1},
{1 << 10, 20},
{1 << 10, 100},
{100 << 10, 1},
{100 << 10, 20},
{100 << 10, 100},
} {
b.Run(fmt.Sprintf("size=%d,thread=%d", tc.objSize, tc.nThreads), func(b *testing.B) {
for name, creat := range map[string]func(testing.TB) common.Storage{
"peapod": newTestPeapod,
"fstree": newTestFSTree,
} {
b.Run(name, func(b *testing.B) {
ptt := creat(b)
require.NoError(b, ptt.Open(false))
require.NoError(b, ptt.Init())
b.Cleanup(func() { _ = ptt.Close() })

benchmark(b, ptt, tc.objSize, tc.nThreads)
})
}
})
}
}
85 changes: 0 additions & 85 deletions pkg/local_object_storage/blobstor/blobovniczatree/put_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package blobovniczatree_test

import (
"crypto/rand"
"errors"
"fmt"
"sync"
"testing"

. "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
Expand Down Expand Up @@ -42,84 +38,3 @@ func TestSingleDir(t *testing.T) {
})
require.NoError(t, err)
}

func benchmarkPutMN(b *testing.B, depth, width uint64, parallel bool) {
nBlobovniczas := uint64(1)
for i := uint64(1); i <= depth+1; i++ {
nBlobovniczas *= width
}

const objSizeLimit = 4 << 10
const fullSizeLimit = 100 << 20

bbcz := NewBlobovniczaTree(
WithRootPath(b.TempDir()),
WithObjectSizeLimit(objSizeLimit),
WithBlobovniczaSize(fullSizeLimit/nBlobovniczas),
WithBlobovniczaShallowWidth(width),
WithBlobovniczaShallowDepth(depth),
)

require.NoError(b, bbcz.Open(false))
b.Cleanup(func() { _ = bbcz.Close() })
require.NoError(b, bbcz.Init())

prm := common.PutPrm{
RawData: make([]byte, objSizeLimit),
}

rand.Read(prm.RawData)

var wg sync.WaitGroup

f := func(prm common.PutPrm) {
defer wg.Done()

var err error

for i := 0; i < b.N; i++ {
prm.Address = oidtest.Address()

_, err = bbcz.Put(prm)
if err != nil {
if errors.Is(err, common.ErrNoSpace) {
break
}
require.NoError(b, err)
}
}
}

nRoutine := 1
if parallel {
nRoutine = 20
}

b.ReportAllocs()
b.ResetTimer()

for j := 0; j < nRoutine; j++ {
wg.Add(1)
go f(prm)
}

wg.Wait()
}

func BenchmarkBlobovniczas_Put(b *testing.B) {
for _, testCase := range []struct {
width, depth uint64
}{
{1, 0},
{10, 0},
{2, 2},
{4, 4},
} {
b.Run(fmt.Sprintf("tree=%dx%d", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, false)
})
b.Run(fmt.Sprintf("tree=%dx%d_parallel", testCase.width, testCase.depth), func(b *testing.B) {
benchmarkPutMN(b, testCase.depth, testCase.width, true)
})
}
}
100 changes: 1 addition & 99 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
Expand Down Expand Up @@ -247,103 +245,7 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
if !prm.DontCompress {
prm.RawData = t.Compress(prm.RawData)
}

// Here is a situation:
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""}
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"}
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1}
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false}
//
// 1. We put an object on node 1.
// 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2.
// 3. PUT operation started by client at (1) also puts an object here.
// 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error.
// Even more than that, concurrent writes can corrupt data.
//
// So here is a solution:
// 1. Write a file to 'name + 1'.
// 2. If it exists, retry with temporary name being 'name + 2'.
// 3. Set some reasonable number of attempts.
//
// It is a bit kludgey, but I am unusually proud about having found this out after
// hours of research on linux kernel, dirsync mount option and ext4 FS, turned out
// to be so hecking simple.
// In a very rare situation we can have multiple partially written copies on disk,
// this will be fixed in another issue (we should remove garbage on start).
const retryCount = 5
for i := 0; i < retryCount; i++ {
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
err := t.writeAndRename(tmpPath, p, prm.RawData)
if err != syscall.EEXIST || i == retryCount-1 {
return common.PutRes{StorageID: []byte{}}, err
}
}

// unreachable, but precaution never hurts, especially 1 day before release.
return common.PutRes{StorageID: []byte{}}, fmt.Errorf("couldn't read file after %d retries", retryCount)
}

// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
err := t.writeFile(tmpPath, data)
if err != nil {
var pe *fs.PathError
if errors.As(err, &pe) {
switch pe.Err {
case syscall.ENOSPC:
err = common.ErrNoSpace
_ = os.RemoveAll(tmpPath)
case syscall.EEXIST:
return syscall.EEXIST
}
}
} else {
err = os.Rename(tmpPath, p)
}
return err
}

func (t *FSTree) writeFlags() int {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL
if t.noSync {
return flags
}
return flags | os.O_SYNC
}

// writeFile writes data to a file with path p.
// The code is copied from `os.WriteFile` with minor corrections for flags.
func (t *FSTree) writeFile(p string, data []byte) error {
f, err := os.OpenFile(p, t.writeFlags(), t.Permissions)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}

// PutStream puts executes handler on a file opened for write.
func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error {
if t.readOnly {
return common.ErrReadOnly
}

p := t.treePath(addr)

if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
return err
}

f, err := os.OpenFile(p, t.writeFlags(), t.Permissions)
if err != nil {
return err
}
defer f.Close()

return handler(f)
return common.PutRes{StorageID: []byte{}}, t.writeData(p, prm.RawData)
}

// Get returns an object from the storage by address.
Expand Down
Loading

0 comments on commit bb9256f

Please sign in to comment.