From f9ccab8fb629b0a4d8e92fd2d1ed2fa35574107f Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 28 Aug 2023 13:54:00 +0300 Subject: [PATCH 1/6] node: Make engine's `IsLocked` public It will allow reusing that method in expiration checks. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/engine/inhume.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index a81a3243d89..5814133712f 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -81,7 +81,7 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) { for i := range prm.addrs { if !prm.forceRemoval { - locked, err := e.isLocked(prm.addrs[i]) + locked, err := e.IsLocked(prm.addrs[i]) if err != nil { e.log.Warn("removing an object without full locking check", zap.Error(err), @@ -179,7 +179,8 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE return ok, retErr } -func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { +// IsLocked checks whether an object is locked according to StorageEngine's state. +func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) { var locked bool var err error var outErr error From 62922b2ffe112b95900c3753ee56675337cb9db9 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 28 Aug 2023 19:12:00 +0300 Subject: [PATCH 2/6] node: Accept expired locked objects Allow replication of any (expired too) locked object. Information about object locking is considered to be presented on the _container nodes_. Refs. #2392. Signed-off-by: Pavel Karpy --- cmd/neofs-node/object.go | 8 +++++++ pkg/core/object/fmt.go | 33 ++++++++++++++++++++++++++++- pkg/core/object/fmt_test.go | 34 ++++++++++++++++++++++++++++-- pkg/services/object/put/local.go | 2 ++ pkg/services/object/put/service.go | 1 + 5 files changed, 75 insertions(+), 3 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index a4cf5d9d630..563755812e5 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -529,6 +529,10 @@ type engineWithNotifications struct { defaultTopic string } +func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) { + return e.base.IsLocked(address) +} + func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { return e.base.Delete(tombstone, toDelete) } @@ -562,6 +566,10 @@ type engineWithoutNotifications struct { engine *engine.StorageEngine } +func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) { + return e.engine.IsLocked(address) +} + func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { var prm engine.InhumePrm diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index 27d7e9fae7a..eec165c151d 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -23,6 +23,7 @@ type FormatValidatorOption func(*cfg) type cfg struct { netState netmap.State + e LockSource } // DeleteHandler is an interface of delete queue processor. @@ -34,6 +35,12 @@ type DeleteHandler interface { DeleteObjects(oid.Address, ...oid.Address) error } +// LockSource is a source of lock relations between the objects. +type LockSource interface { + // IsLocked must clarify object's lock status. + IsLocked(address oid.Address) (bool, error) +} + // Locker is an object lock storage interface. type Locker interface { // Lock list of objects as locked by locker in the specified container. @@ -286,7 +293,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error { } if exp < v.netState.CurrentEpoch() { - return errExpired + // an object could be expired but locked; + // put such an object is a correct operation + + cID, _ := obj.ContainerID() + oID, _ := obj.ID() + + var addr oid.Address + addr.SetContainer(cID) + addr.SetObject(oID) + + locked, err := v.e.IsLocked(addr) + if err != nil { + return fmt.Errorf("locking status check for an expired object: %w", err) + } + + if !locked { + return errExpired + } } return nil @@ -347,3 +371,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption { c.netState = netState } } + +// WithLockSource return option to set a Locked objects source. +func WithLockSource(e LockSource) FormatValidatorOption { + return func(c *cfg) { + c.e = e + } +} diff --git a/pkg/core/object/fmt_test.go b/pkg/core/object/fmt_test.go index ee1d44f8cb6..15109b6e341 100644 --- a/pkg/core/object/fmt_test.go +++ b/pkg/core/object/fmt_test.go @@ -35,13 +35,26 @@ func (s testNetState) CurrentEpoch() uint64 { return s.epoch } +type testLockSource struct { + m map[oid.Address]bool +} + +func (t testLockSource) IsLocked(address oid.Address) (bool, error) { + return t.m[address], nil +} + func TestFormatValidator_Validate(t *testing.T) { const curEpoch = 13 + ls := testLockSource{ + m: make(map[oid.Address]bool), + } + v := NewFormatValidator( WithNetState(testNetState{ epoch: curEpoch, }), + WithLockSource(ls), ) ownerKey, err := keys.NewPrivateKey() @@ -225,8 +238,25 @@ func TestFormatValidator_Validate(t *testing.T) { t.Run("expired object", func(t *testing.T) { val := strconv.FormatUint(curEpoch-1, 10) - err := v.Validate(fn(val), false) - require.ErrorIs(t, err, errExpired) + obj := fn(val) + + t.Run("non-locked", func(t *testing.T) { + err := v.Validate(obj, false) + require.ErrorIs(t, err, errExpired) + }) + + t.Run("locked", func(t *testing.T) { + var addr oid.Address + oID, _ := obj.ID() + cID, _ := obj.ContainerID() + + addr.SetContainer(cID) + addr.SetObject(oID) + ls.m[addr] = true + + err := v.Validate(obj, false) + require.NoError(t, err) + }) }) t.Run("alive object", func(t *testing.T) { diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index da96fd58b3c..90d53ae8f7c 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -20,6 +20,8 @@ type ObjectStorage interface { // Lock must lock passed objects // and return any appeared error. Lock(locker oid.Address, toLock []oid.ID) error + // IsLocked must clarify object's lock status. + IsLocked(oid.Address) (bool, error) } type localTarget struct { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 8ec37f90b4e..c545a0760ff 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -100,6 +100,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option { func WithObjectStorage(v ObjectStorage) Option { return func(c *cfg) { c.localStore = v + c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v)) } } From d9f879ad5c7c81b0597b3d91aad8508801ae8ff6 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 29 Aug 2023 19:04:41 +0300 Subject: [PATCH 3/6] node: Remove expired unlock object by the GC An object can be expired but locked (and not removed, of course). After the LOCK object expiration, the object should be unlocked and, therefore, become unavailable immediately not by the next GC cycle time. Closes #2392. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/local_object_storage/metabase/lock.go | 112 ++++++++++------- .../metabase/lock_test.go | 15 ++- pkg/local_object_storage/metabase/select.go | 98 +++++++++++++++ pkg/local_object_storage/shard/gc.go | 47 ++++++- pkg/local_object_storage/shard/gc_test.go | 119 ++++++++++++++++++ pkg/local_object_storage/shard/shard_test.go | 6 +- 7 files changed, 344 insertions(+), 54 deletions(-) create mode 100644 pkg/local_object_storage/shard/gc_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f1d5c0ad90f..fb7f046e2bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ minor release, the component will be purged, so be prepared (see `Updating` sect - `neofs-lens write-cache list` command duplication (#2505) - `neofs-adm` works with contract wallet in `init` and `update-contracts` commands only (#2134) - Missing removed but locked objects in `SEARCH`'s results (#2526) +- LOCK objects and regular objects expiration conflicts (#2392) ### Removed - Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400) diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 4c167f4a957..a59a31e8af5 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -96,25 +96,28 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error { } // FreeLockedBy unlocks all objects in DB which are locked by lockers. -func (db *DB) FreeLockedBy(lockers []oid.Address) error { +// Returns unlocked objects if any. +func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { - return ErrDegradedMode + return nil, ErrDegradedMode } - return db.boltDB.Update(func(tx *bbolt.Tx) error { - var err error + var unlocked []oid.Address + return unlocked, db.boltDB.Update(func(tx *bbolt.Tx) error { for i := range lockers { - err = freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) + uu, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) if err != nil { return err } + + unlocked = append(unlocked, uu...) } - return err + return nil }) } @@ -134,59 +137,76 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { } // releases all records about the objects locked by the locker. +// Returns unlocked objects (if any). // // Operation is very resource-intensive, which is caused by the admissibility // of multiple locks. Also, if we knew what objects are locked, it would be // possible to speed up the execution. -func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { +func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) { bucketLocked := tx.Bucket(bucketNameLocked) - if bucketLocked != nil { - key := make([]byte, cidSize) - idCnr.Encode(key) + if bucketLocked == nil { + return nil, nil + } - bucketLockedContainer := bucketLocked.Bucket(key) - if bucketLockedContainer != nil { - keyLocker := objectKey(locker, key) - return bucketLockedContainer.ForEach(func(k, v []byte) error { - keyLockers, err := decodeList(v) - if err != nil { - return fmt.Errorf("decode list of lockers in locked bucket: %w", err) - } + key := make([]byte, cidSize) + idCnr.Encode(key) + + bucketLockedContainer := bucketLocked.Bucket(key) + if bucketLockedContainer == nil { + return nil, nil + } + + var unlocked []oid.Address + keyLocker := objectKey(locker, key) - for i := range keyLockers { - if bytes.Equal(keyLockers[i], keyLocker) { - if len(keyLockers) == 1 { - // locker was all alone - err = bucketLockedContainer.Delete(k) - if err != nil { - return fmt.Errorf("delete locked object record from locked bucket: %w", err) - } - } else { - // exclude locker - keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) - - v, err = encodeList(keyLockers) - if err != nil { - return fmt.Errorf("encode updated list of lockers: %w", err) - } - - // update the record - err = bucketLockedContainer.Put(k, v) - if err != nil { - return fmt.Errorf("update list of lockers: %w", err) - } - } - - return nil + return unlocked, bucketLockedContainer.ForEach(func(k, v []byte) error { + keyLockers, err := decodeList(v) + if err != nil { + return fmt.Errorf("decode list of lockers in locked bucket: %w", err) + } + + for i := range keyLockers { + if bytes.Equal(keyLockers[i], keyLocker) { + if len(keyLockers) == 1 { + // locker was all alone + err = bucketLockedContainer.Delete(k) + if err != nil { + return fmt.Errorf("delete locked object record from locked bucket: %w", err) + } + + var oID oid.ID + err = oID.Decode(k) + if err != nil { + return fmt.Errorf("decode unlocked object id error: %w", err) + } + + var addr oid.Address + addr.SetContainer(idCnr) + addr.SetObject(oID) + + unlocked = append(unlocked, addr) + } else { + // exclude locker + keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) + + v, err = encodeList(keyLockers) + if err != nil { + return fmt.Errorf("encode updated list of lockers: %w", err) + } + + // update the record + err = bucketLockedContainer.Put(k, v) + if err != nil { + return fmt.Errorf("update list of lockers: %w", err) } } return nil - }) + } } - } - return nil + return nil + }) } // IsLockedPrm groups the parameters of IsLocked operation. diff --git a/pkg/local_object_storage/metabase/lock_test.go b/pkg/local_object_storage/metabase/lock_test.go index 94a82b4bf15..b010c476e9c 100644 --- a/pkg/local_object_storage/metabase/lock_test.go +++ b/pkg/local_object_storage/metabase/lock_test.go @@ -109,8 +109,9 @@ func TestDB_Lock(t *testing.T) { require.Len(t, res.DeletedLockObjects(), 1) require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0]) - err = db.FreeLockedBy([]oid.Address{lockAddr}) + unlocked, err := db.FreeLockedBy([]oid.Address{lockAddr}) require.NoError(t, err) + require.ElementsMatch(t, objsToAddrs(objs), unlocked) inhumePrm.SetAddresses(objAddr) inhumePrm.SetGCMark() @@ -140,8 +141,9 @@ func TestDB_Lock(t *testing.T) { // unlock just objects that were locked by // just removed locker - err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]}) + unlocked, err := db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]}) require.NoError(t, err) + require.ElementsMatch(t, objsToAddrs(objs), unlocked) // removing objects after unlock @@ -264,3 +266,12 @@ func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*object.Ob return lockedObjs, lockObj } + +func objsToAddrs(oo []*object.Object) []oid.Address { + res := make([]oid.Address, 0, len(oo)) + for _, o := range oo { + res = append(res, objectcore.AddressOf(o)) + } + + return res +} diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 0b5492f9027..f43bf4be3b2 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "strconv" "strings" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" @@ -560,3 +561,100 @@ func isSystemKey(key string) bool { // FIXME: #1147 version-dependent approach return strings.HasPrefix(key, v2object.ReservedFilterPrefix) } + +// FilterExpired filters expired object from `addresses` and return them. +// Uses internal epoch state provided via the [WithEpochState] option. +func (db *DB) FilterExpired(addresses []oid.Address) ([]oid.Address, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + epoch := db.epochState.CurrentEpoch() + res := make([]oid.Address, 0) + cIDToOIDs := make(map[cid.ID][]oid.ID) + for _, a := range addresses { + cIDToOIDs[a.Container()] = append(cIDToOIDs[a.Container()], a.Object()) + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + for cID, oIDs := range cIDToOIDs { + expired, err := filterExpired(tx, epoch, cID, oIDs) + if err != nil { + return err + } + + for _, oID := range expired { + var a oid.Address + a.SetContainer(cID) + a.SetObject(oID) + + res = append(res, a) + } + } + + return nil + }) + if err != nil { + return nil, err + } + + return res, nil +} + +func filterExpired(tx *bbolt.Tx, epoch uint64, cID cid.ID, oIDs []oid.ID) ([]oid.ID, error) { + objKey := make([]byte, objectKeySize) + expAttr := v2object.SysAttributeExpEpoch + expirationBucketKey := make([]byte, bucketKeySize+len(expAttr)) + + res := make([]oid.ID, 0) + notHandled := sliceToMap(oIDs) + expirationBucket := tx.Bucket(attributeBucketName(cID, expAttr, expirationBucketKey)) + if expirationBucket == nil { + return nil, nil + } + + err := expirationBucket.ForEach(func(expBktKey, _ []byte) error { + exp, err := strconv.ParseUint(string(expBktKey), 10, 64) + if err != nil { + return fmt.Errorf("could not parse expiration epoch: %w", err) + } else if exp >= epoch { + return nil + } + + epochExpirationBucket := expirationBucket.Bucket(expBktKey) + if epochExpirationBucket == nil { + return nil + } + + for oID := range notHandled { + key := objectKey(oID, objKey) + if epochExpirationBucket.Get(key) != nil { + delete(notHandled, oID) + res = append(res, oID) + } + } + + if len(notHandled) == 0 { + return errBreakBucketForEach + } + + return nil + }) + if err != nil && !errors.Is(err, errBreakBucketForEach) { + return nil, err + } + + return res, nil +} + +func sliceToMap[V comparable](s []V) map[V]struct{} { + res := make(map[V]struct{}, len(s)) + for _, v := range s { + res[v] = struct{}{} + } + + return res +} diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index ef73f5f8814..0d46e45c7de 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -416,13 +416,16 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) { } // HandleExpiredLocks unlocks all objects which were locked by lockers. -// If successful, marks lockers themselves as garbage. +// If successful, marks lockers themselves as garbage. Also, marks as +// garbage every object that becomes free-to-remove and just removed +// lock object is the only reason for that object to be alive (e.g. +// expired but locked objects). func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { if s.GetMode().NoMetabase() { return } - err := s.metaBase.FreeLockedBy(lockers) + unlocked, err := s.metaBase.FreeLockedBy(lockers) if err != nil { s.log.Warn("failure to unlock objects", zap.String("error", err.Error()), @@ -431,8 +434,17 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { return } + expired, err := s.metaBase.FilterExpired(unlocked) + if err != nil { + s.log.Warn("expired object filtering", + zap.Error(err), + ) + + return + } + var pInhume meta.InhumePrm - pInhume.SetAddresses(lockers...) + pInhume.SetAddresses(append(lockers, expired...)...) pInhume.SetGCMark() res, err := s.metaBase.Inhume(pInhume) @@ -448,12 +460,15 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { } // HandleDeletedLocks unlocks all objects which were locked by lockers. +// Also, marks as garbage every object that becomes free-to-remove and +// just removed lock object is the only reason for that object to be +// alive (e.g. expired but locked objects). func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { if s.GetMode().NoMetabase() { return } - err := s.metaBase.FreeLockedBy(lockers) + unlocked, err := s.metaBase.FreeLockedBy(lockers) if err != nil { s.log.Warn("failure to unlock objects", zap.String("error", err.Error()), @@ -461,6 +476,30 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { return } + + expired, err := s.metaBase.FilterExpired(unlocked) + if err != nil { + s.log.Warn("expired object filtering", + zap.Error(err), + ) + + return + } + + var pInhume meta.InhumePrm + pInhume.SetAddresses(expired...) + pInhume.SetGCMark() + + res, err := s.metaBase.Inhume(pInhume) + if err != nil { + s.log.Warn("failure to mark unlocked objects as garbage", + zap.String("error", err.Error()), + ) + + return + } + + s.decObjectCounterBy(logical, res.AvailableInhumed()) } // NotificationChannel returns channel for shard events. diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go new file mode 100644 index 00000000000..bf7ab44921b --- /dev/null +++ b/pkg/local_object_storage/shard/gc_test.go @@ -0,0 +1,119 @@ +package shard_test + +import ( + "context" + "path/filepath" + "testing" + "time" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) { + var sh *shard.Shard + + epoch := &epochState{ + Value: 10, + } + + rootPath := t.TempDir() + opts := []shard.Option{ + shard.WithLogger(zap.NewNop()), + shard.WithBlobStorOptions( + blobstor.WithStorages([]blobstor.SubStorage{ + { + Storage: peapod.New( + filepath.Join(rootPath, "blob", "peapod"), + 0600, + time.Second, + ), + Policy: func(_ *object.Object, data []byte) bool { + return len(data) <= 1<<20 + }, + }, + { + Storage: fstree.New( + fstree.WithPath(filepath.Join(rootPath, "blob"))), + }, + }), + ), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(rootPath, "meta")), + meta.WithEpochState(epoch), + ), + shard.WithDeletedLockCallback(func(_ context.Context, aa []oid.Address) { + sh.HandleDeletedLocks(aa) + }), + shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) { + sh.HandleExpiredLocks(aa) + }), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + + return pool + }), + } + + sh = shard.New(opts...) + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init()) + + t.Cleanup(func() { + releaseShard(sh, t) + }) + + cnr := cidtest.ID() + + var expAttr objectSDK.Attribute + expAttr.SetKey(objectV2.SysAttributeExpEpoch) + expAttr.SetValue("1") + + obj := generateObjectWithCID(t, cnr) + obj.SetAttributes(expAttr) + objID, _ := obj.ID() + + expAttr.SetValue("3") + + lock := generateObjectWithCID(t, cnr) + lock.SetType(object.TypeLock) + lock.SetAttributes(expAttr) + lockID, _ := lock.ID() + + var putPrm shard.PutPrm + putPrm.SetObject(obj) + + _, err := sh.Put(putPrm) + require.NoError(t, err) + + err = sh.Lock(cnr, lockID, []oid.ID{objID}) + require.NoError(t, err) + + putPrm.SetObject(lock) + _, err = sh.Put(putPrm) + require.NoError(t, err) + + epoch.Value = 5 + sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value) + + var getPrm shard.GetPrm + getPrm.SetAddress(objectCore.AddressOf(obj)) + require.Eventually(t, func() bool { + _, err = sh.Get(getPrm) + return shard.IsErrNotFound(err) + }, 3*time.Second, 1*time.Second, "lock expiration should free object removal") +} diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 6fa978a01e9..7dd04361b0c 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -26,10 +26,12 @@ import ( "go.uber.org/zap/zaptest" ) -type epochState struct{} +type epochState struct { + Value uint64 +} func (s epochState) CurrentEpoch() uint64 { - return 0 + return s.Value } func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { From e12d1e060646de69d89536459c933c153d61f3ca Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 31 Aug 2023 17:37:00 +0300 Subject: [PATCH 4/6] node: Fix expired LOCK object removal Locked objects are prohibited to be removed by a user operation so `Force` should be applied to the expired LOCK objects. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/shard/gc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 0d46e45c7de..0ab09c1055f 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -445,7 +445,7 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { var pInhume meta.InhumePrm pInhume.SetAddresses(append(lockers, expired...)...) - pInhume.SetGCMark() + pInhume.SetForceGCMark() res, err := s.metaBase.Inhume(pInhume) if err != nil { From 065846a83ffd1e6dcffc6dcb836cc311ebd15292 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 31 Aug 2023 23:58:00 +0300 Subject: [PATCH 5/6] meta: Do not change bucket while performing `ForEach` See https://github.com/boltdb/bolt/pull/428. Signed-off-by: Pavel Karpy --- pkg/local_object_storage/metabase/lock.go | 39 ++++++++++++++++------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index a59a31e8af5..676008ddcd7 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -136,6 +136,11 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { return false } +type kv struct { + k []byte + v []byte +} + // releases all records about the objects locked by the locker. // Returns unlocked objects (if any). // @@ -157,9 +162,10 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres } var unlocked []oid.Address + var bktChanges []kv keyLocker := objectKey(locker, key) - return unlocked, bucketLockedContainer.ForEach(func(k, v []byte) error { + err := bucketLockedContainer.ForEach(func(k, v []byte) error { keyLockers, err := decodeList(v) if err != nil { return fmt.Errorf("decode list of lockers in locked bucket: %w", err) @@ -168,11 +174,7 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres for i := range keyLockers { if bytes.Equal(keyLockers[i], keyLocker) { if len(keyLockers) == 1 { - // locker was all alone - err = bucketLockedContainer.Delete(k) - if err != nil { - return fmt.Errorf("delete locked object record from locked bucket: %w", err) - } + bktChanges = append(bktChanges, kv{k: k, v: nil}) var oID oid.ID err = oID.Decode(k) @@ -194,11 +196,7 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres return fmt.Errorf("encode updated list of lockers: %w", err) } - // update the record - err = bucketLockedContainer.Put(k, v) - if err != nil { - return fmt.Errorf("update list of lockers: %w", err) - } + bktChanges = append(bktChanges, kv{k: k, v: v}) } return nil @@ -207,6 +205,25 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres return nil }) + if err != nil { + return nil, fmt.Errorf("iterating lockers: %w", err) + } + + for _, kv := range bktChanges { + if kv.v == nil { + err = bucketLockedContainer.Delete(kv.k) + if err != nil { + return nil, fmt.Errorf("delete locked object record from locked bucket: %w", err) + } + } else { + err = bucketLockedContainer.Put(kv.k, kv.v) + if err != nil { + return nil, fmt.Errorf("update list of lockers: %w", err) + } + } + } + + return unlocked, nil } // IsLockedPrm groups the parameters of IsLocked operation. From 48fbe5c5908e057f35c4f9de1690f705c20c914e Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 7 Sep 2023 22:20:00 +0300 Subject: [PATCH 6/6] engine: Check block status before `IsLocked` call Signed-off-by: Pavel Karpy --- pkg/local_object_storage/engine/inhume.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 5814133712f..f4a2f299f05 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -181,6 +181,18 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE // IsLocked checks whether an object is locked according to StorageEngine's state. func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) { + var res bool + var err error + + err = e.execIfNotBlocked(func() error { + res, err = e.isLocked(addr) + return err + }) + + return res, err +} + +func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { var locked bool var err error var outErr error