Skip to content

Commit

Permalink
node: Accept expired locked objects
Browse files Browse the repository at this point in the history
Allow replication of any (expired too) locked object. Information about
object locking is considered to be presented on the _container nodes_.
Refs. nspcc-dev#2392.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Aug 31, 2023
1 parent f9ccab8 commit 62922b2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 3 deletions.
8 changes: 8 additions & 0 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ type engineWithNotifications struct {
defaultTopic string
}

func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
return e.base.IsLocked(address)
}

func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(tombstone, toDelete)
}
Expand Down Expand Up @@ -562,6 +566,10 @@ type engineWithoutNotifications struct {
engine *engine.StorageEngine
}

func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
return e.engine.IsLocked(address)
}

func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm

Expand Down
33 changes: 32 additions & 1 deletion pkg/core/object/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type FormatValidatorOption func(*cfg)

type cfg struct {
netState netmap.State
e LockSource
}

// DeleteHandler is an interface of delete queue processor.
Expand All @@ -34,6 +35,12 @@ type DeleteHandler interface {
DeleteObjects(oid.Address, ...oid.Address) error
}

// LockSource is a source of lock relations between the objects.
type LockSource interface {
// IsLocked must clarify object's lock status.
IsLocked(address oid.Address) (bool, error)
}

// Locker is an object lock storage interface.
type Locker interface {
// Lock list of objects as locked by locker in the specified container.
Expand Down Expand Up @@ -286,7 +293,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
}

if exp < v.netState.CurrentEpoch() {
return errExpired
// an object could be expired but locked;
// put such an object is a correct operation

cID, _ := obj.ContainerID()
oID, _ := obj.ID()

var addr oid.Address
addr.SetContainer(cID)
addr.SetObject(oID)

locked, err := v.e.IsLocked(addr)
if err != nil {
return fmt.Errorf("locking status check for an expired object: %w", err)
}

if !locked {
return errExpired
}
}

return nil
Expand Down Expand Up @@ -347,3 +371,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
c.netState = netState
}
}

// WithLockSource return option to set a Locked objects source.
func WithLockSource(e LockSource) FormatValidatorOption {
return func(c *cfg) {
c.e = e
}
}
34 changes: 32 additions & 2 deletions pkg/core/object/fmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,26 @@ func (s testNetState) CurrentEpoch() uint64 {
return s.epoch
}

type testLockSource struct {
m map[oid.Address]bool
}

func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
return t.m[address], nil
}

func TestFormatValidator_Validate(t *testing.T) {
const curEpoch = 13

ls := testLockSource{
m: make(map[oid.Address]bool),
}

v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
)

ownerKey, err := keys.NewPrivateKey()
Expand Down Expand Up @@ -225,8 +238,25 @@ func TestFormatValidator_Validate(t *testing.T) {

t.Run("expired object", func(t *testing.T) {
val := strconv.FormatUint(curEpoch-1, 10)
err := v.Validate(fn(val), false)
require.ErrorIs(t, err, errExpired)
obj := fn(val)

t.Run("non-locked", func(t *testing.T) {
err := v.Validate(obj, false)
require.ErrorIs(t, err, errExpired)
})

t.Run("locked", func(t *testing.T) {
var addr oid.Address
oID, _ := obj.ID()
cID, _ := obj.ContainerID()

addr.SetContainer(cID)
addr.SetObject(oID)
ls.m[addr] = true

err := v.Validate(obj, false)
require.NoError(t, err)
})
})

t.Run("alive object", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/services/object/put/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type ObjectStorage interface {
// Lock must lock passed objects
// and return any appeared error.
Lock(locker oid.Address, toLock []oid.ID) error
// IsLocked must clarify object's lock status.
IsLocked(oid.Address) (bool, error)
}

type localTarget struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/services/object/put/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
func WithObjectStorage(v ObjectStorage) Option {
return func(c *cfg) {
c.localStore = v
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
}
}

Expand Down

0 comments on commit 62922b2

Please sign in to comment.