Skip to content

Commit

Permalink
Support for multi master sites
Browse files Browse the repository at this point in the history
  • Loading branch information
krishnasrinivas committed Dec 8, 2019
1 parent c5112d4 commit 9d67abb
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 50 deletions.
3 changes: 3 additions & 0 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,3 +1186,6 @@ func (f *fsClient) fsStat(isIncomplete bool) (os.FileInfo, *probe.Error) {
}
return st, nil
}

func (f *fsClient) AddUserAgent(app, version string) {
}
10 changes: 10 additions & 0 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,10 @@ func (c *s3Client) removeIncompleteObjects(bucket string, objectsCh <-chan strin
return removeObjectErrorCh
}

func (c *s3Client) AddUserAgent(app string, version string) {
c.api.SetAppInfo(app, version)
}

// Remove - remove object or bucket(s).
func (c *s3Client) Remove(isIncomplete, isRemoveBucket bool, contentCh <-chan *clientContent) <-chan *probe.Error {
errorCh := make(chan *probe.Error)
Expand Down Expand Up @@ -1197,6 +1201,7 @@ func (c *s3Client) listObjectWrapper(bucket, object string, isRecursive bool, do
if metadata {
return c.api.ListObjectsV2WithMetadata(bucket, object, isRecursive, doneCh)
}

return c.api.ListObjectsV2(bucket, object, isRecursive, doneCh)
}

Expand Down Expand Up @@ -1964,6 +1969,11 @@ func (c *s3Client) listRecursiveInRoutine(contentCh chan *clientContent, metadat
content.ETag = object.ETag
content.Time = object.LastModified
content.Type = os.FileMode(0664)
content.Expires = object.Expires
content.UserMetadata = map[string]string{}
for k, v := range object.UserMetadata {
content.UserMetadata[k] = v
}
contentCh <- content
}
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Client interface {

// GetURL returns back internal url
GetURL() clientURL

AddUserAgent(app, version string)
}

// Content container for content metadata
Expand Down
70 changes: 55 additions & 15 deletions cmd/difference.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package cmd

import (
"reflect"
"strings"
"unicode/utf8"

Expand All @@ -31,6 +30,7 @@ type differType int

const (
differInNone differType = iota // does not differ
differInETag // differs in ETag
differInSize // differs in size
differInTime // differs in time
differInMetadata // differs in metadata
Expand Down Expand Up @@ -67,6 +67,43 @@ func dirDifference(sourceClnt, targetClnt Client, sourceURL, targetURL string) (
return difference(sourceClnt, targetClnt, sourceURL, targetURL, false, false, true, DirFirst)
}

const multiMasterETagKey = "X-Amz-Meta-Mm-Etag"
const multiMasterSTagKey = "X-Amz-Meta-Mm-Stag"

func eTagMatch(src, tgt *clientContent) bool {
if tgt.UserMetadata[multiMasterETagKey] != "" {
if tgt.UserMetadata[multiMasterETagKey] == src.UserMetadata[multiMasterETagKey] || tgt.UserMetadata[multiMasterETagKey] == src.ETag {
return true
}
}
if src.UserMetadata[multiMasterETagKey] != "" {
if src.UserMetadata[multiMasterETagKey] == tgt.UserMetadata[multiMasterETagKey] || src.UserMetadata[multiMasterETagKey] == tgt.ETag {
return true
}
}
return src.ETag == tgt.ETag
}

func metadataEqual(m1, m2 map[string]string) bool {
for k, v := range m1 {
if k == multiMasterETagKey || k == multiMasterSTagKey {
continue
}
if m2[k] != v {
return false
}
}
for k, v := range m2 {
if k == multiMasterETagKey || k == multiMasterSTagKey {
continue
}
if m1[k] != v {
return false
}
}
return true
}

// objectDifference function finds the difference between all objects
// recursively in sorted order from source and target.
func difference(sourceClnt, targetClnt Client, sourceURL, targetURL string, isMetadata bool, isRecursive, returnSimilar bool, dirOpt DirOpt) (diffCh chan diffMessage) {
Expand Down Expand Up @@ -169,7 +206,6 @@ func difference(sourceClnt, targetClnt Client, sourceURL, targetURL string, isMe
if normalizedExpected == normalizedCurrent {
srcType, tgtType := srcCtnt.Type, tgtCtnt.Type
srcSize, tgtSize := srcCtnt.Size, tgtCtnt.Size
srcTime, tgtTime := srcCtnt.Time, tgtCtnt.Time
if srcType.IsRegular() && !tgtType.IsRegular() ||
!srcType.IsRegular() && tgtType.IsRegular() {
// Type differs. Source is never a directory.
Expand All @@ -182,7 +218,21 @@ func difference(sourceClnt, targetClnt Client, sourceURL, targetURL string, isMe
}
continue
}
if (srcType.IsRegular() && tgtType.IsRegular()) && srcSize != tgtSize {
if eTagMatch(srcCtnt, tgtCtnt) {
// If ETag matches, only thing that can differ is metadata.
if isMetadata &&
!metadataEqual(srcCtnt.UserMetadata, tgtCtnt.UserMetadata) &&
!metadataEqual(srcCtnt.Metadata, tgtCtnt.Metadata) {
// Regular files user requesting additional metadata to same file.
diffCh <- diffMessage{
FirstURL: srcCtnt.URL.String(),
SecondURL: tgtCtnt.URL.String(),
Diff: differInMetadata,
firstContent: srcCtnt,
secondContent: tgtCtnt,
}
}
} else if (srcType.IsRegular() && tgtType.IsRegular()) && srcSize != tgtSize {
// Regular files differing in size.
diffCh <- diffMessage{
FirstURL: srcCtnt.URL.String(),
Expand All @@ -192,9 +242,8 @@ func difference(sourceClnt, targetClnt Client, sourceURL, targetURL string, isMe
secondContent: tgtCtnt,
}
} else if isMetadata &&
!reflect.DeepEqual(srcCtnt.UserMetadata, tgtCtnt.UserMetadata) &&
!reflect.DeepEqual(srcCtnt.Metadata, tgtCtnt.Metadata) {

!metadataEqual(srcCtnt.UserMetadata, tgtCtnt.UserMetadata) &&
!metadataEqual(srcCtnt.Metadata, tgtCtnt.Metadata) {
// Regular files user requesting additional metadata to same file.
diffCh <- diffMessage{
FirstURL: srcCtnt.URL.String(),
Expand All @@ -203,15 +252,6 @@ func difference(sourceClnt, targetClnt Client, sourceURL, targetURL string, isMe
firstContent: srcCtnt,
secondContent: tgtCtnt,
}
} else if srcTime.After(tgtTime) {
// Regular files differing in timestamp.
diffCh <- diffMessage{
FirstURL: srcCtnt.URL.String(),
SecondURL: tgtCtnt.URL.String(),
Diff: differInTime,
firstContent: srcCtnt,
secondContent: tgtCtnt,
}
}
// No differ
if returnSimilar {
Expand Down
107 changes: 74 additions & 33 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ var (
Name: "preserve, a",
Usage: "preserve file(s)/object(s) attributes and bucket policy rules on target bucket(s)",
},
cli.StringFlag{
Name: "multi-master",
Usage: "tag for multi master mode",
},
cli.StringSliceFlag{
Name: "exclude",
Usage: "exclude object(s) that match specified object name pattern",
Expand Down Expand Up @@ -163,6 +167,8 @@ EXAMPLES:
`,
}

const uaMirrorAppName = "mc-mirror"

type mirrorJob struct {

// the channel to trap SIGKILL signals
Expand Down Expand Up @@ -198,6 +204,9 @@ type mirrorJob struct {

excludeOptions []string
encKeyDB map[string][]prefixSSEPair

multiMasterEnable bool
multiMasterSTag string
}

// mirrorMessage container for file mirror messages
Expand Down Expand Up @@ -236,7 +245,7 @@ func (mj *mirrorJob) doRemove(sURLs URLs) URLs {
if pErr != nil {
return sURLs.WithError(pErr)
}

clnt.AddUserAgent(uaMirrorAppName, Version)
contentCh := make(chan *clientContent, 1)
contentCh <- &clientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path)}
close(contentCh)
Expand Down Expand Up @@ -288,6 +297,19 @@ func (mj *mirrorJob) doMirror(ctx context.Context, cancelMirror context.CancelFu
sURLs.TargetContent.Metadata["X-Amz-Storage-Class"] = mj.storageClass
}

// Set multiMasterETagKey for the target.
if sURLs.SourceContent.UserMetadata[multiMasterETagKey] != "" {
sURLs.TargetContent.Metadata[multiMasterETagKey] = sURLs.SourceContent.UserMetadata[multiMasterETagKey]
} else {
sURLs.TargetContent.Metadata[multiMasterETagKey] = sURLs.SourceContent.ETag
}

if sURLs.SourceContent.UserMetadata[multiMasterSTagKey] != "" {
sURLs.TargetContent.Metadata[multiMasterSTagKey] = sURLs.SourceContent.UserMetadata[multiMasterSTagKey]
} else {
sURLs.TargetContent.Metadata[multiMasterSTagKey] = mj.multiMasterSTag
}

if mj.isPreserve {
attrValue, pErr := getFileAttrMeta(sURLs, mj.encKeyDB)
if pErr != nil {
Expand Down Expand Up @@ -404,28 +426,34 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance

if (event.Type == EventCreate) ||
(event.Type == EventCreatePutRetention) {
// we are checking if a destination file exists now, and if we only
// overwrite it when force is enabled.
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: &clientContent{URL: *sourceURL, Retention: event.Type == EventCreatePutRetention},
TargetAlias: targetAlias,
TargetContent: &clientContent{URL: *targetURL},
encKeyDB: mj.encKeyDB,
}
sourceClient, err := newClient(aliasedPath)
if err != nil {
// cannot create sourceclient
mj.statusCh <- mirrorURL.WithError(err)
continue
}
// we are checking if a destination file exists now, and if we only
// overwrite it when force is enabled.
sourceContent, err := sourceClient.Stat(false, true, false, srcSSE)
if err != nil {
// source doesn't exist anymore
mj.statusCh <- mirrorURL.WithError(err)
continue
}
if sourceContent.Metadata[multiMasterETagKey] != "" {
// If source has multiMasterETagKey, it means that the object was uplooaded by "mc mirror"
// hence ignore the event to avoid copying it.
continue
}
mirrorURL.SourceContent = sourceContent
if event.Size == 0 {
sourceClient, err := newClient(aliasedPath)
if err != nil {
// cannot create sourceclient
mj.statusCh <- mirrorURL.WithError(err)
continue
}
sourceContent, err := sourceClient.Stat(false, false, false, srcSSE)
if err != nil {
// source doesn't exist anymore
mj.statusCh <- mirrorURL.WithError(err)
continue
}
targetClient, err := newClient(targetPath)
if err != nil {
// cannot create targetclient
Expand All @@ -440,7 +468,7 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
} // doesn't exist
shouldQueue = true
}
if shouldQueue || mj.isOverwrite {
if shouldQueue || mj.isOverwrite || mj.multiMasterEnable {
// adjust total, because we want to show progress of
// the item still queued to be copied.
mj.status.Add(sourceContent.Size)
Expand All @@ -453,7 +481,7 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
continue
}
shouldQueue := false
if !mj.isOverwrite {
if !mj.isOverwrite && !mj.multiMasterEnable {
targetClient, err := newClient(targetPath)
if err != nil {
// cannot create targetclient
Expand All @@ -470,7 +498,7 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
} // doesn't exist
shouldQueue = true
}
if shouldQueue || mj.isOverwrite {
if shouldQueue || mj.isOverwrite || mj.multiMasterEnable {
mirrorURL.SourceContent.Size = event.Size
// adjust total, because we want to show progress
// of the itemj stiil queued to be copied.
Expand All @@ -482,6 +510,9 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL)
}
} else if event.Type == EventRemove {
if strings.Contains(event.UserAgent, uaMirrorAppName) {
continue
}
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: nil,
Expand All @@ -491,7 +522,7 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
}
mirrorURL.TotalCount = mj.status.GetCounts()
mirrorURL.TotalSize = mj.status.Get()
if mirrorURL.TargetContent != nil && mj.isRemove {
if mirrorURL.TargetContent != nil && (mj.isRemove || mj.multiMasterEnable) {
mj.statusCh <- mj.doRemove(mirrorURL)
}
}
Expand Down Expand Up @@ -603,27 +634,32 @@ func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc
return mj.monitorMirrorStatus()
}

func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch, isPreserve bool, excludeOptions []string, olderThan, newerThan string, storageClass string, userMetadata map[string]string, encKeyDB map[string][]prefixSSEPair) *mirrorJob {
func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch, isPreserve, multiMasterEnable bool, excludeOptions []string, olderThan, newerThan string, storageClass string, multiMasterSTag string, userMetadata map[string]string, encKeyDB map[string][]prefixSSEPair) *mirrorJob {
if multiMasterEnable {
isPreserve = true
}
mj := mirrorJob{
trapCh: signalTrap(os.Interrupt, syscall.SIGTERM, syscall.SIGKILL),
m: new(sync.Mutex),

sourceURL: srcURL,
targetURL: dstURL,

isFake: isFake,
isRemove: isRemove,
isOverwrite: isOverwrite,
isWatch: isWatch,
isPreserve: isPreserve,
excludeOptions: excludeOptions,
olderThan: olderThan,
newerThan: newerThan,
storageClass: storageClass,
userMetadata: userMetadata,
encKeyDB: encKeyDB,
statusCh: make(chan URLs),
watcher: NewWatcher(UTCNow()),
isFake: isFake,
isRemove: isRemove,
isOverwrite: isOverwrite,
isWatch: isWatch,
isPreserve: isPreserve,
excludeOptions: excludeOptions,
olderThan: olderThan,
newerThan: newerThan,
storageClass: storageClass,
userMetadata: userMetadata,
encKeyDB: encKeyDB,
statusCh: make(chan URLs),
watcher: NewWatcher(UTCNow()),
multiMasterEnable: multiMasterEnable,
multiMasterSTag: multiMasterSTag,
}

mj.parallel, mj.queueCh = newParallelManager(mj.statusCh)
Expand Down Expand Up @@ -702,17 +738,22 @@ func runMirror(srcURL, dstURL string, ctx *cli.Context, encKeyDB map[string][]pr
fatalIf(err, "Unable to initialize `"+dstURL+"`.")
}

multiMasterSTag := ctx.String("multi-master")
multiMasterEnable := multiMasterSTag != ""

// Create a new mirror job and execute it
mj := newMirrorJob(srcURL, dstURL,
ctx.Bool("fake"),
ctx.Bool("remove"),
isOverwrite,
ctx.Bool("watch"),
ctx.Bool("a"),
multiMasterEnable,
ctx.StringSlice("exclude"),
ctx.String("older-than"),
ctx.String("newer-than"),
ctx.String("storage-class"),
multiMasterSTag,
userMetaMap,
encKeyDB)

Expand Down
Loading

0 comments on commit 9d67abb

Please sign in to comment.