Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: re-read block file while falling back to different decompression method #172

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions backupbackingimage/backupbackingimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,10 @@ func restoreBlock(bsDriver backupstore.BackupStoreDriver, backingImageFile *os.F

func restoreBlockToFile(bsDriver backupstore.BackupStoreDriver, backingImageFile *os.File, decompression string, blk common.BlockMapping) error {
blkFile := getBackingImageBlockFilePath(blk.BlockChecksum)
rc, err := bsDriver.Read(blkFile)
r, err := backupstore.DecompressAndVerifyWithFallback(bsDriver, blkFile, decompression, blk.BlockChecksum)
if err != nil {
return err
}
defer rc.Close()
r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
if r == nil {
return err
}
}

if _, err := backingImageFile.Seek(blk.Offset, 0); err != nil {
return err
Expand Down
12 changes: 2 additions & 10 deletions deltablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,17 +775,9 @@ func RestoreDeltaBlockBackup(ctx context.Context, config *DeltaRestoreConfig) er

func restoreBlockToFile(bsDriver BackupStoreDriver, volumeName string, volDev *os.File, decompression string, blk BlockMapping) error {
blkFile := getBlockFilePath(volumeName, blk.BlockChecksum)
rc, err := bsDriver.Read(blkFile)
r, err := DecompressAndVerifyWithFallback(bsDriver, blkFile, decompression, blk.BlockChecksum)
if err != nil {
return errors.Wrapf(err, "failed to read block %v with checksum %v", blkFile, blk.BlockChecksum)
}
defer rc.Close()

r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
if r == nil {
return err
}
return errors.Wrapf(err, "failed to decompress and verify block %v with checksum %v", blkFile, blk.BlockChecksum)
}

if _, err := volDev.Seek(blk.Offset, 0); err != nil {
Expand Down
57 changes: 57 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package backupstore

import (
"compress/gzip"
"context"
"io"
"path/filepath"
"strings"
"sync"

"github.com/pkg/errors"

"github.com/longhorn/backupstore/util"
)

func getBlockPath(volumeName string) string {
Expand Down Expand Up @@ -50,3 +57,53 @@ func mergeErrorChannels(ctx context.Context, channels ...<-chan error) <-chan er
}()
return out
}

// DecompressAndVerifyWithFallback decompresses the given data and verifies the data integrity.
// If the decompression fails, it will try to decompress with the fallback method.
func DecompressAndVerifyWithFallback(bsDriver BackupStoreDriver, blkFile, decompression, checksum string) (io.Reader, error) {
// Helper function to read block from backup store
readBlock := func() (io.ReadCloser, error) {
rc, err := bsDriver.Read(blkFile)
if err != nil {
return nil, errors.Wrapf(err, "failed to read block %v", blkFile)
}
return rc, nil
}

// First attempt to read and decompress/verify
rc, err := readBlock()
if err != nil {
return nil, err
}
defer rc.Close()

r, err := util.DecompressAndVerify(decompression, rc, checksum)
if err == nil {
return r, nil
}

// If there's an error, determine the alternative decompression method
alternativeDecompression := ""
if strings.Contains(err.Error(), gzip.ErrHeader.Error()) {
alternativeDecompression = "lz4"
} else if strings.Contains(err.Error(), "lz4: bad magic number") {
alternativeDecompression = "gzip"
}

// Second attempt with alternative decompression, if applicable
if alternativeDecompression != "" {
retriedRc, err := readBlock()
if err != nil {
return nil, err
}
defer retriedRc.Close()

r, err = util.DecompressAndVerify(alternativeDecompression, retriedRc, checksum)
if err != nil {
return nil, errors.Wrapf(err, "fallback decompression also failed for block %v", blkFile)
}
return r, nil
}

return nil, errors.Wrapf(err, "decompression verification failed for block %v", blkFile)
}
34 changes: 0 additions & 34 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
lz4 "github.com/pierrec/lz4/v4"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"golang.org/x/sys/unix"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,39 +132,6 @@ func DecompressAndVerify(method string, src io.Reader, checksum string) (io.Read
return bytes.NewReader(block), nil
}

// DecompressAndVerifyWithFallback decompresses the given data and verifies the data integrity.
// If the decompression fails, it will try to decompress with the fallback method.
func DecompressAndVerifyWithFallback(decompression string, rc io.ReadCloser, checksum string) (io.Reader, error) {
r, err := DecompressAndVerify(decompression, rc, checksum)
if err == nil {
return r, nil
}
// Fall back to other decompression method if the current one fails
// The mitigation will be removed after identifying https:/longhorn/longhorn/issues/7687
// Seek rc to offset 0
seeker, ok := rc.(io.Seeker)
if !ok {
return nil, errors.Wrapf(err, "failed to cast to io.Seeker for block %v", checksum)
}

_, errFallback := seeker.Seek(0, io.SeekStart)
if errFallback != nil {
// Merge the err1 and err2 and error out
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to seek to offset 0 for block %v", checksum)
}

if strings.Contains(err.Error(), gzip.ErrHeader.Error()) {
r, errFallback = DecompressAndVerify("lz4", rc, checksum)
} else if strings.Contains(err.Error(), "lz4: bad magic number") {
r, errFallback = DecompressAndVerify("gzip", rc, checksum)
}
if errFallback != nil {
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to decompress and verify block %v with fallback", checksum)
}

return r, err
}

func newCompressionWriter(method string, buffer io.Writer) (io.WriteCloser, error) {
switch method {
case "gzip":
Expand Down