Skip to content

Commit

Permalink
meta: Do not change bucket while performing ForEach
Browse files Browse the repository at this point in the history
See boltdb/bolt#428.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Aug 31, 2023
1 parent 087b2f7 commit 037ee9f
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions pkg/local_object_storage/metabase/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
return false
}

type kv struct {
k []byte
v []byte
}

// releases all records about the objects locked by the locker.
// Returns unlocked objects (if any).
//
Expand All @@ -157,9 +162,10 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
}

var unlocked []oid.Address
var bktChanges []kv
keyLocker := objectKey(locker, key)

return unlocked, bucketLockedContainer.ForEach(func(k, v []byte) error {
err := bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
Expand All @@ -168,11 +174,7 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
// locker was all alone
err = bucketLockedContainer.Delete(k)
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
bktChanges = append(bktChanges, kv{k: k, v: nil})

var oID oid.ID
err = oID.Decode(k)
Expand All @@ -194,11 +196,7 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres
return fmt.Errorf("encode updated list of lockers: %w", err)
}

// update the record
err = bucketLockedContainer.Put(k, v)
if err != nil {
return fmt.Errorf("update list of lockers: %w", err)
}
bktChanges = append(bktChanges, kv{k: k, v: v})
}

return nil
Expand All @@ -207,6 +205,25 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Addres

return nil
})
if err != nil {
return nil, fmt.Errorf("iterating lockers: %w", err)
}

for _, kv := range bktChanges {
if kv.v == nil {
err = bucketLockedContainer.Delete(kv.k)
if err != nil {
return nil, fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
err = bucketLockedContainer.Put(kv.k, kv.v)
if err != nil {
return nil, fmt.Errorf("update list of lockers: %w", err)
}
}
}

return unlocked, nil
}

// IsLockedPrm groups the parameters of IsLocked operation.
Expand Down

0 comments on commit 037ee9f

Please sign in to comment.