aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-08-10 10:01:57 -0700
committerchrislu <chris.lu@gmail.com>2024-08-10 10:01:57 -0700
commit7438648d1cfacd5ca570dd029d1bdb5fd271bd70 (patch)
treecf12b49473be0373cb03d83470ddc75708454171 /weed/replication
parent49893267e978cc3fda00dc991e00099742fb5a9d (diff)
parent63c707f9c1b4dc469ec39c446563c324ce4ccb6f (diff)
downloadseaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.tar.xz
seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.zip
Merge branch 'master' into mq
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/repl_util/replication_util.go4
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go13
-rw-r--r--weed/replication/sink/filersink/fetch_write.go11
-rw-r--r--weed/replication/sink/filersink/filer_sink.go4
-rw-r--r--weed/replication/source/filer_source.go5
5 files changed, 30 insertions, 7 deletions
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
index 9682ca623..4a77fd04a 100644
--- a/weed/replication/repl_util/replication_util.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -4,7 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
- "github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
@@ -21,7 +21,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index 9bbd7b8eb..890e68fd4 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -5,8 +5,10 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
+ "net/http"
"net/url"
"strings"
+ "time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -109,7 +111,16 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
// Azure Storage account's container.
appendBlobURL := g.containerURL.NewAppendBlobURL(key)
- _, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{})
+ accessCondition := azblob.BlobAccessConditions{}
+ if entry.Attributes!=nil && entry.Attributes.Mtime>0 {
+ accessCondition.ModifiedAccessConditions.IfUnmodifiedSince = time.Unix(entry.Attributes.Mtime, 0)
+ }
+
+ res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{})
+ if res != nil && res.StatusCode() == http.StatusPreconditionFailed {
+ glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err)
+ return nil
+ }
if err != nil {
return err
}
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 63e1226b6..4bcbc7898 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) {
@@ -88,9 +89,15 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
if err != nil {
return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
- fileId, uploadResult, err, _ := operation.UploadWithRetry(
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
+ return "", fmt.Errorf("upload data: %v", err)
+ }
+
+ fileId, uploadResult, err, _ := uploader.UploadWithRetry(
fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index ce2de41b9..49f6877a0 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -120,6 +120,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
glog.V(3).Infof("already replicated %s", key)
return nil
}
+ if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime {
+ glog.V(3).Infof("skip overwriting %s", key)
+ return nil
+ }
}
replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key)
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 167907a5a..768e251a4 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type ReplicationSource interface {
@@ -106,7 +107,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
if fs.proxyByFiler {
- return util.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
+ return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
}
fileUrls, err := fs.LookupFileId(fileId)
@@ -115,7 +116,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
}
for _, fileUrl := range fileUrls {
- filename, header, resp, err = util.DownloadFile(fileUrl, "")
+ filename, header, resp, err = util_http.DownloadFile(fileUrl, "")
if err != nil {
glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
} else {