Skip to content

Commit

Permalink
fix: do not rename files on mmap failure (#23396) (#23421)
Browse files Browse the repository at this point in the history
If NewTSMReader() fails because mmap fails, do not
rename the file, because the error is probably
caused by vm.max_map_count being too low

closes #23172

(cherry picked from commit ec412f7)

closes #23413
  • Loading branch information
davidby-influx authored Jun 10, 2022
1 parent 3635e1c commit b22fe17
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 21 deletions.
3 changes: 1 addition & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}

fs := NewFileStore(path)
fs := NewFileStore(path, WithMadviseWillNeed(opt.Config.TSMWillNeed))
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))

Expand Down
42 changes: 27 additions & 15 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ type FileStore struct {
currentGeneration int
dir string

files []TSMFile
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
files []TSMFile
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.

logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
Expand All @@ -196,6 +195,8 @@ type FileStore struct {
obs tsdb.FileStoreObserver

copyFiles bool

readerOptions []tsmReaderOption
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -232,7 +233,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
}

// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string) *FileStore {
func NewFileStore(dir string, options ...tsmReaderOption) *FileStore {
logger := zap.NewNop()
fs := &FileStore{
dir: dir,
Expand All @@ -248,6 +249,7 @@ func NewFileStore(dir string) *FileStore {
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
copyFiles: runtime.GOOS == "windows",
readerOptions: options,
}
fs.purger.fileStore = fs
return fs
Expand Down Expand Up @@ -554,26 +556,36 @@ func (f *FileStore) Open() error {
defer f.openLimiter.Release()

start := time.Now()
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
df, err := NewTSMReader(file, f.readerOptions...)
f.logger.Info("Opened file",
zap.String("path", file.Name()),
zap.Int("id", idx),
zap.Duration("duration", time.Since(start)))

// If we are unable to read a TSM file then log the error, rename
// the file, and continue loading the shard without it.
// If we are unable to read a TSM file then log the error.
if err != nil {
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
file.Close()
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
if errors.Is(err, MmapError{}) {
// An MmapError may indicate we have insufficient
// handles for the mmap call, in which case the file should
// be left untouched, and the vm.max_map_count be raised.
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)}
return
} else {
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
return
}

df.WithObserver(f.obs)
readerC <- &res{r: df}
}(i, file)
Expand Down Expand Up @@ -793,7 +805,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
}

tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
tsm, err := NewTSMReader(fd, f.readerOptions...)
if err != nil {
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
Expand Down
109 changes: 109 additions & 0 deletions tsdb/engine/tsm1/file_store_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package tsm1

import (
"github.com/influxdata/influxdb/tsdb"
)

var TestMmapInitFailOption = func(err error) tsmReaderOption {
return func(r *TSMReader) {
r.accessor = &badBlockAccessor{error: err}
}
}

type badBlockAccessor struct {
error
}

func (b *badBlockAccessor) init() (*indirectIndex, error) {
return nil, b.error
}

func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) rename(path string) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) path() string {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) close() error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) free() error {
//TODO implement me
panic("implement me")
}
38 changes: 38 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
)

func TestFileStore_Read(t *testing.T) {
Expand Down Expand Up @@ -2375,6 +2376,43 @@ func TestFileStore_Open(t *testing.T) {
}
}

func TestFileStore_OpenFail(t *testing.T) {
var err error
dir := MustTempDir()
defer func() { assert.NoError(t, os.RemoveAll(dir), "failed to remove temporary directory") }()

// Create a TSM file...
data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}

files, err := newFileDir(dir, data)
if err != nil {
fatal(t, "creating test files", err)
}
assert.Equal(t, 1, len(files))
f := files[0]

const mmapErrMsg = "mmap failure in test"
const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg
// With an mmap failure, the files should all be left where they are, because they are not corrupt
openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure")

// With a non-mmap failure, the file failing to open should be moved aside
const otherErrMsg = "some Random Init Failure"
openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg))
assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure")
assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure")
}

func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) {
fs := tsm1.NewFileStore(dir, tsm1.TestMmapInitFailOption(initErr))
err := fs.Open()
assert.Error(t, err)
assert.Contains(t, err.Error(), fullErrMsg)
defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }()
assert.Equal(t, 0, fs.Count(), "file count mismatch")
}

func TestFileStore_Remove(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
Expand Down
31 changes: 27 additions & 4 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
}
}

// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
// NewTSMReader returns a new TSMReader from the given file.
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
t := &TSMReader{}
Expand All @@ -231,9 +232,11 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
if t.accessor == nil {
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
}
}

index, err := t.accessor.init()
Expand Down Expand Up @@ -1341,6 +1344,24 @@ func verifyVersion(r io.Reader) error {
return nil
}

type MmapError struct {
error
}

func (m *MmapError) Unwrap() error {
return m.error
}

func (m MmapError) Is(e error) bool {
_, oks := e.(MmapError)
_, okp := e.(*MmapError)
return oks || okp
}

func NewMmapError(e error) MmapError {
return MmapError{error: e}
}

func (m *mmapAccessor) init() (*indirectIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -1366,7 +1387,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {

m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return nil, err
// Wrap the error to let callers know this was an error
// from mmap, and may indicate vm.max_map_count is too low
return nil, NewMmapError(err)
}
if len(m.b) < 8 {
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
Expand Down

0 comments on commit b22fe17

Please sign in to comment.