Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/Removed object listing #2526

Merged
merged 6 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 72 additions & 70 deletions CHANGELOG.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cmd/neofs-cli/internal/common/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func ExitOnErr(cmd *cobra.Command, errFmt string, err error) {
internal
aclDenied
awaitTimeout
alreadyRemoved
)

var code int
var accessErr = new(sdkstatus.ObjectAccessDenied)
var alreadyRemovedErr = new(sdkstatus.ObjectAlreadyRemoved)

switch {
case errors.Is(err, sdkstatus.ErrServerInternal):
Expand All @@ -46,6 +48,8 @@ func ExitOnErr(cmd *cobra.Command, errFmt string, err error) {
err = fmt.Errorf("%w: %s", err, accessErr.Reason())
case errors.Is(err, ErrAwaitTimeout):
code = awaitTimeout
case errors.As(err, alreadyRemovedErr):
code = alreadyRemoved
default:
code = internal
}
Expand Down
1 change: 1 addition & 0 deletions docs/cli-exit-codes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The NeoFS CLI returns specific exit codes to indicate the outcome of command exe
| 1 | Internal error or an unspecified failure. |
| 2 | Object access denied or unauthorized. |
| 3 | Await timeout expired for a certain condition. |
| 4 | Object already removed. |



Expand Down
18 changes: 15 additions & 3 deletions pkg/local_object_storage/metabase/exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,29 +106,41 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
// expired previously for less than the one epoch duration

var expired bool
oID := addr.Object()
cID := addr.Container()

// bucket with objects that have expiration attr
attrKey := make([]byte, bucketKeySize+len(objectV2.SysAttributeExpEpoch))
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), objectV2.SysAttributeExpEpoch, attrKey))
expirationBucket := tx.Bucket(attributeBucketName(cID, objectV2.SysAttributeExpEpoch, attrKey))
if expirationBucket != nil {
// bucket that contains objects that expire in the current epoch
prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10)))
if prevEpochBkt != nil {
rawOID := objectKey(addr.Object(), make([]byte, objectKeySize))
rawOID := objectKey(oID, make([]byte, objectKeySize))
if prevEpochBkt.Get(rawOID) != nil {
expired = true
}
}
}

if expired {
if objectLocked(tx, cID, oID) {
return 0
}

return 3
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
}

graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
addrKey := addressKey(addr, make([]byte, addressKeySize))
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)

removedStatus := inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)
if removedStatus != 0 && objectLocked(tx, cID, oID) {
return 0
}

return removedStatus
}

func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
Expand Down
21 changes: 21 additions & 0 deletions pkg/local_object_storage/metabase/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,27 @@ func TestDB_IsLocked(t *testing.T) {
require.False(t, res.Locked())
}

func TestDB_Lock_Expired(t *testing.T) {
es := &epochState{e: 123}

db := newDB(t, meta.WithEpochState(es))

// put an object
addr := putWithExpiration(t, db, object.TypeRegular, 124)

// expire the obj
es.e = 125
_, err := metaGet(db, addr, false)
require.ErrorIs(t, err, meta.ErrObjectIsExpired)

// lock the obj
require.NoError(t, db.Lock(addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}))

// object is expired but locked, thus, must be available
_, err = metaGet(db, addr, false)
require.NoError(t, err)
}

// putAndLockObj puts object, returns it and its locker.
func putAndLockObj(t *testing.T, db *meta.DB, numOfLockedObjs int) ([]*object.Object, *object.Object) {
cnr := cidtest.ID()
Expand Down
49 changes: 49 additions & 0 deletions pkg/local_object_storage/metabase/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,55 @@ func TestExpiredObjects(t *testing.T) {
})
}

func TestRemovedObjects(t *testing.T) {
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))

cnr := cidtest.ID()

o1 := generateObjectWithCID(t, cnr)
addAttribute(o1, "1", "11")

o2 := generateObjectWithCID(t, cnr)
addAttribute(o2, "2", "22")

o3 := generateObjectWithCID(t, cnr) // expired but will be locked
setExpiration(o3, currEpoch-1)

require.NoError(t, putBig(db, o1))
require.NoError(t, putBig(db, o2))
require.NoError(t, putBig(db, o3))

f1 := objectSDK.SearchFilters{}
f1.AddFilter("1", "11", objectSDK.MatchStringEqual)

f2 := objectSDK.SearchFilters{}
f2.AddFilter("2", "22", objectSDK.MatchStringEqual)

fAll := objectSDK.SearchFilters{}

testSelect(t, db, cnr, f1, object.AddressOf(o1))
testSelect(t, db, cnr, f2, object.AddressOf(o2))
testSelect(t, db, cnr, fAll, object.AddressOf(o1), object.AddressOf(o2))

// Removed object

ts1 := generateObject(t)
require.NoError(t, metaInhume(db, object.AddressOf(o1), object.AddressOf(ts1)))

oo, err := metaSelect(db, cnr, f1)
require.NoError(t, err)
require.Empty(t, oo)

testSelect(t, db, cnr, fAll, object.AddressOf(o2))

// Expired (== removed) but locked

l := generateObject(t)
require.NoError(t, db.Lock(cnr, object.AddressOf(l).Object(), []oid.ID{object.AddressOf(o3).Object()}))

testSelect(t, db, cnr, fAll, object.AddressOf(o2), object.AddressOf(o3))
}

func benchmarkSelect(b *testing.B, db *meta.DB, cid cidSDK.ID, fs objectSDK.SearchFilters, expected int) {
var prm meta.SelectPrm
prm.SetContainerID(cid)
Expand Down
Loading