diff options
| author | chrislu <chris.lu@gmail.com> | 2024-08-10 10:01:57 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-08-10 10:01:57 -0700 |
| commit | 7438648d1cfacd5ca570dd029d1bdb5fd271bd70 (patch) | |
| tree | cf12b49473be0373cb03d83470ddc75708454171 /weed/replication | |
| parent | 49893267e978cc3fda00dc991e00099742fb5a9d (diff) | |
| parent | 63c707f9c1b4dc469ec39c446563c324ce4ccb6f (diff) | |
| download | seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.tar.xz seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.zip | |
Merge branch 'master' into mq
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/repl_util/replication_util.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 13 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 11 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 4 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 5 |
5 files changed, 30 insertions, 7 deletions
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go index 9682ca623..4a77fd04a 100644 --- a/weed/replication/repl_util/replication_util.go +++ b/weed/replication/repl_util/replication_util.go @@ -4,7 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/replication/source" - "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error { @@ -21,7 +21,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 9bbd7b8eb..890e68fd4 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -5,8 +5,10 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" + "net/http" "net/url" "strings" + "time" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -109,7 +111,16 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] // Azure Storage account's container. appendBlobURL := g.containerURL.NewAppendBlobURL(key) - _, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{}) + accessCondition := azblob.BlobAccessConditions{} + if entry.Attributes!=nil && entry.Attributes.Mtime>0 { + accessCondition.ModifiedAccessConditions.IfUnmodifiedSince = time.Unix(entry.Attributes.Mtime, 0) + } + + res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{}) + if res != nil && res.StatusCode() == http.StatusPreconditionFailed { + glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err) + return nil + } if err != nil { return err } diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 63e1226b6..4bcbc7898 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) { @@ -88,9 +89,15 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) if err != nil { return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) - fileId, uploadResult, err, _ := operation.UploadWithRetry( + uploader, err := operation.NewUploader() + if err != nil { + glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) + return "", fmt.Errorf("upload data: %v", err) + } + + fileId, uploadResult, err, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index ce2de41b9..49f6877a0 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -120,6 +120,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ glog.V(3).Infof("already replicated %s", key) return nil } + if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime { + glog.V(3).Infof("skip overwriting %s", key) + return nil + } } replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key) diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 167907a5a..768e251a4 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) type ReplicationSource interface { @@ -106,7 +107,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { if fs.proxyByFiler { - return util.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") } fileUrls, err := fs.LookupFileId(fileId) @@ -115,7 +116,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea } for _, fileUrl := range fileUrls { - filename, header, resp, err = util.DownloadFile(fileUrl, "") + filename, header, resp, err = util_http.DownloadFile(fileUrl, "") if err != nil { glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) } else { |
