Skip to content

Commit

Permalink
fix/Removed object listing (#2526)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Aug 29, 2023
2 parents 5252a82 + 3ec865e commit fb7dd3f
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 73 deletions.
142 changes: 72 additions & 70 deletions CHANGELOG.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cmd/neofs-cli/internal/common/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func ExitOnErr(cmd *cobra.Command, errFmt string, err error) {
internal
aclDenied
awaitTimeout
alreadyRemoved
)

var code int
var accessErr = new(sdkstatus.ObjectAccessDenied)
var alreadyRemovedErr = new(sdkstatus.ObjectAlreadyRemoved)

switch {
case errors.Is(err, sdkstatus.ErrServerInternal):
Expand All @@ -46,6 +48,8 @@ func ExitOnErr(cmd *cobra.Command, errFmt string, err error) {
err = fmt.Errorf("%w: %s", err, accessErr.Reason())
case errors.Is(err, ErrAwaitTimeout):
code = awaitTimeout
case errors.As(err, alreadyRemovedErr):
code = alreadyRemoved
default:
code = internal
}
Expand Down
1 change: 1 addition & 0 deletions docs/cli-exit-codes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The NeoFS CLI returns specific exit codes to indicate the outcome of command exe
| 1 | Internal error or an unspecified failure. |
| 2 | Object access denied or unauthorized. |
| 3 | Await timeout expired for a certain condition. |
| 4 | Object already removed. |



Expand Down
18 changes: 15 additions & 3 deletions pkg/local_object_storage/metabase/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,29 +106,41 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
// expired previously for less than the one epoch duration

var expired bool
oID := addr.Object()
cID := addr.Container()

// bucket with objects that have expiration attr
attrKey := make([]byte, bucketKeySize+len(objectV2.SysAttributeExpEpoch))
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), objectV2.SysAttributeExpEpoch, attrKey))
expirationBucket := tx.Bucket(attributeBucketName(cID, objectV2.SysAttributeExpEpoch, attrKey))
if expirationBucket != nil {
// bucket that contains objects that expire in the current epoch
prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10)))
if prevEpochBkt != nil {
rawOID := objectKey(addr.Object(), make([]byte, objectKeySize))
rawOID := objectKey(oID, make([]byte, objectKeySize))
if prevEpochBkt.Get(rawOID) != nil {
expired = true
}
}
}

if expired {
if objectLocked(tx, cID, oID) {
return 0
}

return 3
}

graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
addrKey := addressKey(addr, make([]byte, addressKeySize))
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)

removedStatus := inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)
if removedStatus != 0 && objectLocked(tx, cID, oID) {
return 0
}

return removedStatus
}

func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
Expand Down
21 changes: 21 additions & 0 deletions pkg/local_object_storage/metabase/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,27 @@ func TestDB_IsLocked(t *testing.T) {
require.False(t, res.Locked())
}

func TestDB_Lock_Expired(t *testing.T) {
es := &epochState{e: 123}

db := newDB(t, meta.WithEpochState(es))

// put an object
addr := putWithExpiration(t, db, object.TypeRegular, 124)

// expire the obj
es.e = 125
_, err := metaGet(db, addr, false)
require.ErrorIs(t, err, meta.ErrObjectIsExpired)

// lock the obj
require.NoError(t, db.Lock(addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}))

// object is expired but locked, thus, must be available
_, err = metaGet(db, addr, false)
require.NoError(t, err)
}

// putAndLockObj puts object, returns it and its locker.
func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*object.Object, *object.Object) {
cnr := cidtest.ID()
Expand Down
49 changes: 49 additions & 0 deletions pkg/local_object_storage/metabase/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,55 @@ func TestExpiredObjects(t *testing.T) {
})
}

func TestRemovedObjects(t *testing.T) {
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))

cnr := cidtest.ID()

o1 := generateObjectWithCID(t, cnr)
addAttribute(o1, "1", "11")

o2 := generateObjectWithCID(t, cnr)
addAttribute(o2, "2", "22")

o3 := generateObjectWithCID(t, cnr) // expired but will be locked
setExpiration(o3, currEpoch-1)

require.NoError(t, putBig(db, o1))
require.NoError(t, putBig(db, o2))
require.NoError(t, putBig(db, o3))

f1 := objectSDK.SearchFilters{}
f1.AddFilter("1", "11", objectSDK.MatchStringEqual)

f2 := objectSDK.SearchFilters{}
f2.AddFilter("2", "22", objectSDK.MatchStringEqual)

fAll := objectSDK.SearchFilters{}

testSelect(t, db, cnr, f1, object.AddressOf(o1))
testSelect(t, db, cnr, f2, object.AddressOf(o2))
testSelect(t, db, cnr, fAll, object.AddressOf(o1), object.AddressOf(o2))

// Removed object

ts1 := generateObject(t)
require.NoError(t, metaInhume(db, object.AddressOf(o1), object.AddressOf(ts1)))

oo, err := metaSelect(db, cnr, f1)
require.NoError(t, err)
require.Empty(t, oo)

testSelect(t, db, cnr, fAll, object.AddressOf(o2))

// Expired (== removed) but locked

l := generateObject(t)
require.NoError(t, db.Lock(cnr, object.AddressOf(l).Object(), []oid.ID{object.AddressOf(o3).Object()}))

testSelect(t, db, cnr, fAll, object.AddressOf(o2), object.AddressOf(o3))
}

func benchmarkSelect(b *testing.B, db *meta.DB, cid cidSDK.ID, fs objectSDK.SearchFilters, expected int) {
var prm meta.SelectPrm
prm.SetContainerID(cid)
Expand Down

0 comments on commit fb7dd3f

Please sign in to comment.