Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix operations with storage groups #2491

Merged
merged 7 commits into from
Aug 15, 2023
3 changes: 3 additions & 0 deletions pkg/services/object/put/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (x *slicingTarget) WriteHeader(hdr *object.Object) error {
if x.sessionToken != nil {
opts.SetSession(*x.sessionToken)
}
if !x.homoHashDisabled {
opts.CalculateHomomorphicChecksum()
}

var err error
x.payloadWriter, err = slicer.InitPut(x.ctx, &readyObjectWriter{
Expand Down
7 changes: 6 additions & 1 deletion pkg/services/object/put/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
}

homomorphicChecksumRequired := !prm.cnr.IsHomomorphicHashingDisabled()

if prm.hdr.Signature() != nil {
p.relay = prm.relay

Expand All @@ -75,6 +77,8 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
fmt: p.fmtValidator,

maxPayloadSz: p.maxPayloadSz,

homomorphicChecksumRequired: homomorphicChecksumRequired,
}

return nil
Expand Down Expand Up @@ -122,14 +126,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
nextTarget: newSlicingTarget(
p.ctx,
p.maxPayloadSz,
prm.cnr.IsHomomorphicHashingDisabled(),
!homomorphicChecksumRequired,
user.NewAutoIDSigner(*sessionKey),
sToken,
p.networkState.CurrentEpoch(),
func() transformer.ObjectTarget {
return p.newCommonTarget(prm)
},
),
homomorphicChecksumRequired: homomorphicChecksumRequired,
}

return nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/services/object/put/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type validatingTarget struct {
payloadSz uint64 // payload size of the streaming object from header

writtenPayload uint64 // number of already written payload bytes

homomorphicChecksumRequired bool
}

var (
Expand All @@ -55,6 +57,20 @@ func (t *validatingTarget) WriteHeader(obj *objectSDK.Object) error {
return ErrExceedingMaxSize
}

if t.homomorphicChecksumRequired {
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
cs, csSet := obj.PayloadHomomorphicHash()
switch {
case !csSet:
return errors.New("missing homomorphic payload checksum")
case cs.Type() != checksum.TZ:
return fmt.Errorf("wrong/unsupported type of homomorphic payload checksum: %s instead of %s",
cs.Type(), checksum.TZ)
case len(cs.Value()) != tz.Size:
return fmt.Errorf("invalid/unsupported length of %s homomorphic payload checksum: %d instead of %d",
cs.Type(), len(cs.Value()), tz.Size)
}
}

cs, csSet := obj.PayloadChecksum()
if !csSet {
return errors.New("missing payload checksum")
Expand Down
32 changes: 28 additions & 4 deletions pkg/services/object_manager/storagegroup/collect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package storagegroup

import (
"errors"
"fmt"

objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand All @@ -10,6 +13,12 @@ import (
"github.com/nspcc-dev/tzhash/tz"
)

var (
errMissingHomomorphicChecksum = errors.New("missing homomorphic checksum in member's child header")
errInvalidHomomorphicChecksum = errors.New("invalid homomorphic checksum in member's child header")
errMissingSplitMemberID = errors.New("missing object ID in member's child header")
)

// CollectMembers creates new storage group structure and fills it
// with information about members collected via HeadReceiver.
//
Expand All @@ -28,22 +37,37 @@ func CollectMembers(r objutil.HeadReceiver, cnr cid.ID, members []oid.ID, calcHo
for i := range members {
addr.SetObject(members[i])

if err := objutil.IterateAllSplitLeaves(r, addr, func(leaf *object.Object) {
var errMember error

if err := objutil.IterateSplitLeaves(r, addr, func(leaf *object.Object) bool {
id, ok := leaf.ID()
if !ok {
return
errMember = errMissingSplitMemberID
return true
}

phyMembers = append(phyMembers, id)
sumPhySize += leaf.PayloadSize()
cs, _ := leaf.PayloadHomomorphicHash()
cs, csSet := leaf.PayloadHomomorphicHash()

if calcHomoHash {
if !csSet {
errMember = fmt.Errorf("%w '%s'", errMissingHomomorphicChecksum, id)
return true
} else if cs.Type() != checksum.TZ {
errMember = fmt.Errorf("%w: type '%s' instead of '%s'", errInvalidHomomorphicChecksum, cs.Type(), checksum.TZ)
return true
}
phyHashes = append(phyHashes, cs.Value())
}

return false
}); err != nil {
return nil, err
}
if errMember != nil {
return nil, fmt.Errorf("collect split-chain for member #%d: %w", i, errMember)
}
}

sg.SetMembers(phyMembers)
Expand All @@ -52,7 +76,7 @@ func CollectMembers(r objutil.HeadReceiver, cnr cid.ID, members []oid.ID, calcHo
if calcHomoHash {
sumHash, err := tz.Concat(phyHashes)
if err != nil {
return nil, err
return nil, fmt.Errorf("concatenate '%s' checksums of all members: %w", checksum.TZ, err)
roman-khimov marked this conversation as resolved.
Show resolved Hide resolved
}

var cs checksum.Checksum
Expand Down
60 changes: 60 additions & 0 deletions pkg/services/object_manager/storagegroup/collect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package storagegroup

import (
"crypto/sha256"
"testing"

"github.com/nspcc-dev/neofs-sdk-go/checksum"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

type mockedObjects struct {
hdr *object.Object
}

func (x *mockedObjects) Head(_ oid.Address) (any, error) {
return x.hdr, nil
}

func TestCollectMembers(t *testing.T) {
t.Run("missing member's child homomorphic checksum", func(t *testing.T) {
var child object.Object
child.SetID(oidtest.ID())

src := &mockedObjects{hdr: &child}

_, err := CollectMembers(src, cidtest.ID(), []oid.ID{oidtest.ID()}, true)
require.ErrorIs(t, err, errMissingHomomorphicChecksum)
})

t.Run("invalid member's child homomorphic checksum", func(t *testing.T) {
var child object.Object
child.SetID(oidtest.ID())

var cs checksum.Checksum
cs.SetSHA256([sha256.Size]byte{1}) // any non-homomorphic

child.SetPayloadHomomorphicHash(cs)

src := &mockedObjects{hdr: &child}

_, err := CollectMembers(src, cidtest.ID(), []oid.ID{oidtest.ID()}, true)
require.ErrorIs(t, err, errInvalidHomomorphicChecksum)
})

t.Run("missing member's child ID", func(t *testing.T) {
var child object.Object

_, ok := child.ID()
require.False(t, ok)

src := &mockedObjects{hdr: &child}

_, err := CollectMembers(src, cidtest.ID(), []oid.ID{oidtest.ID()}, false)
require.ErrorIs(t, err, errMissingSplitMemberID)
})
}
Loading