diff options
| author | ustuzhanin <55892859+ustuzhanin@users.noreply.github.com> | 2020-10-02 22:47:25 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-02 22:47:25 +0500 |
| commit | 3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch) | |
| tree | e0b42e531d18136d9e272258187a305690ee2b4d /weed/replication | |
| parent | cbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff) | |
| parent | 9ab98fa912814686b3035a97b5173c1628fbc0fc (diff) | |
| download | seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip | |
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 26 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 14 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 14 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 9 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 62 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 14 | ||||
| -rw-r--r-- | weed/replication/sink/replication_sink.go | 6 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 14 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 6 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 10 |
10 files changed, 97 insertions, 78 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 051199adb..c4228434f 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -3,6 +3,8 @@ package replication import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "google.golang.org/grpc" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -43,28 +45,42 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p key = newKey if message.OldEntry != nil && message.NewEntry == nil { glog.V(4).Infof("deleting %v", key) - return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) } if message.OldEntry == nil && message.NewEntry != nil { glog.V(4).Infof("creating %v", key) - return r.sink.CreateEntry(key, message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry, message.Signatures) } if message.OldEntry == nil && message.NewEntry == nil { glog.V(0).Infof("weird message %+v", message) return nil } - foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks) + foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) if foundExisting { glog.V(4).Infof("updated %v", key) return err } - err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false) + err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false, message.Signatures) if err != nil { return fmt.Errorf("delete old entry %v: %v", key, err) } glog.V(4).Infof("creating missing %v", key) - return r.sink.CreateEntry(key, message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry, message.Signatures) +} + +func ReadFilerSignature(grpcDialOption grpc.DialOption, filer string) (filerSignature int32, readErr error) { + if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil { + return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err) + } else { + filerSignature = resp.Signature + } + return nil + }); readErr != nil { + return 0, readErr + } + return filerSignature, nil } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index fa229de22..dab5cf4f4 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" @@ -70,7 +70,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e return nil } -func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { key = cleanKey(key) @@ -87,7 +87,7 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo } -func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { key = cleanKey(key) @@ -95,8 +95,8 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { return nil } - totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. @@ -115,7 +115,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } var writeErr error - readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) }) @@ -132,7 +132,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index bf8632827..cf212f129 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -4,7 +4,7 @@ import ( "context" "strings" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -57,7 +57,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { return nil } -func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { key = cleanKey(key) @@ -76,7 +76,7 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) } -func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { key = cleanKey(key) @@ -84,8 +84,8 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { return nil } - totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { @@ -103,7 +103,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } var writeErr error - readErr := util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { _, err := writer.Write(data) if err != nil { writeErr = err @@ -123,7 +123,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *B2Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { key = cleanKey(key) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index bde29176c..d33669447 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,6 +3,7 @@ package filersink import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "sync" "google.golang.org/grpc" @@ -59,11 +60,11 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir stri func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { - filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) + filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) if err != nil { return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) } - defer readCloser.Close() + defer util.CloseResponse(resp) var host string var auth security.EncodedJwt @@ -100,9 +101,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) // fetch data as is, regardless whether it is encrypted or not - uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) + uploadResult, err, _ := operation.Upload(fileUrl, filename, false, resp.Body, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) return "", fmt.Errorf("upload data: %v", err) } if uploadResult.Error != "" { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 6429859b4..f1d8ff840 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -8,7 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" @@ -40,36 +40,36 @@ func (fs *FilerSink) GetSinkToDirectory() string { } func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { - return fs.initialize( + return fs.DoInitialize( configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"directory"), configuration.GetString(prefix+"replication"), configuration.GetString(prefix+"collection"), configuration.GetInt(prefix+"ttlSec"), - ) + security.LoadClientTLS(util.GetViper(), "grpc.client")) } func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { fs.filerSource = s } -func (fs *FilerSink) initialize(grpcAddress string, dir string, - replication string, collection string, ttlSec int) (err error) { +func (fs *FilerSink) DoInitialize(grpcAddress string, dir string, + replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption) (err error) { fs.grpcAddress = grpcAddress fs.dir = dir fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) - fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + fs.grpcDialOption = grpcDialOption return nil } -func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { dir, name := util.FullPath(key).DirAndName() - glog.V(1).Infof("delete entry: %v", key) - err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true) + glog.V(4).Infof("delete entry: %v", key) + err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures) if err != nil { glog.V(0).Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) @@ -77,7 +77,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo return nil } -func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -90,8 +90,8 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { } glog.V(1).Infof("lookup: %v", lookupRequest) if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil { - if filer2.ETag(resp.Entry) == filer2.ETag(entry) { - glog.V(0).Infof("already replicated %s", key) + if filer.ETag(resp.Entry) == filer.ETag(entry) { + glog.V(3).Infof("already replicated %s", key) return nil } } @@ -99,11 +99,11 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { replicatedChunks, err := fs.replicateChunks(entry.Chunks, dir) if err != nil { - glog.V(0).Infof("replicate entry chunks %s: %v", key, err) - return fmt.Errorf("replicate entry chunks %s: %v", key, err) + // only warning here since the source chunk may have been deleted already + glog.Warningf("replicate entry chunks %s: %v", key, err) } - glog.V(0).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) + glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -114,9 +114,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { Chunks: replicatedChunks, }, IsFromOtherCluster: true, + Signatures: signatures, } - glog.V(1).Infof("create: %v", request) + glog.V(3).Infof("create: %v", request) if err := filer_pb.CreateEntry(client, request); err != nil { glog.V(0).Infof("create entry %s: %v", key, err) return fmt.Errorf("create entry %s: %v", key, err) @@ -126,7 +127,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { dir, name := util.FullPath(key).DirAndName() @@ -155,19 +156,19 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent return false, fmt.Errorf("lookup %s: %v", key, err) } - glog.V(0).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) + glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime { // skip if already changed // this usually happens when the messages are not ordered - glog.V(0).Infof("late updates %s", key) - } else if filer2.ETag(newEntry) == filer2.ETag(existingEntry) { + glog.V(2).Infof("late updates %s", key) + } else if filer.ETag(newEntry) == filer.ETag(existingEntry) { // skip if no change // this usually happens when retrying the replication - glog.V(0).Infof("already replicated %s", key) + glog.V(3).Infof("already replicated %s", key) } else { // find out what changed - deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry) + deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry) if err != nil { return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err) } @@ -175,7 +176,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer.DoMinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source @@ -193,6 +194,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent Directory: newParentPath, Entry: existingEntry, IsFromOtherCluster: true, + Signatures: signatures, } if _, err := client.UpdateEntry(context.Background(), request); err != nil { @@ -203,21 +205,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { - aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) +func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { + aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) if aErr != nil { return nil, nil, aErr } - bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) + bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) if bErr != nil { return nil, nil, bErr } - deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...) - deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...) + deletedChunks = append(deletedChunks, filer.DoMinusChunks(aData, bData)...) + deletedChunks = append(deletedChunks, filer.DoMinusChunks(aMeta, bMeta)...) - newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...) - newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...) + newChunks = append(newChunks, filer.DoMinusChunks(bData, aData)...) + newChunks = append(newChunks, filer.DoMinusChunks(bMeta, aMeta)...) return } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index 4b58160db..c6bfa212a 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -8,7 +8,7 @@ import ( "cloud.google.com/go/storage" "google.golang.org/api/option" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" @@ -69,7 +69,7 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str return nil } -func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { if isDirectory { key = key + "/" @@ -83,14 +83,14 @@ func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) } -func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { if entry.IsDirectory { return nil } - totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) @@ -101,7 +101,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { return err } - err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { wc.Write(data) }) @@ -119,7 +119,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { // TODO improve efficiency return false, nil } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index 6d85f660a..cfc6e0a4d 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -9,9 +9,9 @@ import ( type ReplicationSink interface { GetName() string Initialize(configuration util.Configuration, prefix string) error - DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error - CreateEntry(key string, entry *filer_pb.Entry) error - UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) + DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error + CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error + UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 625cf406c..58432ee6b 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -12,7 +12,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" @@ -83,7 +83,7 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc return nil } -func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { key = cleanKey(key) @@ -95,7 +95,7 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b } -func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { key = cleanKey(key) if entry.IsDirectory { @@ -107,8 +107,8 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { return err } - totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) @@ -116,7 +116,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { for chunkIndex, chunk := range chunkViews { partId := chunkIndex + 1 wg.Add(1) - go func(chunk *filer2.ChunkView, index int) { + go func(chunk *filer.ChunkView, index int) { defer wg.Done() if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr @@ -136,7 +136,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index c5c65ed5c..8a8e7a92b 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -9,7 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -103,7 +103,7 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId } // To upload a part -func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) { +func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.ChunkView) (*s3.CompletedPart, error) { var readSeeker io.ReadSeeker readSeeker, err := s3sink.buildReadSeeker(chunk) @@ -156,7 +156,7 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou return err } -func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, error) { +func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) { fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId) if err != nil { return nil, err diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 69c23fe82..9106ee98b 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -28,13 +28,13 @@ type FilerSource struct { } func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { - return fs.initialize( + return fs.DoInitialize( configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"directory"), ) } -func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { +func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.Dir = dir fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") @@ -79,16 +79,16 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { return } -func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { +func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) { fileUrl, err := fs.LookupFileId(part) if err != nil { return "", nil, nil, err } - filename, header, readCloser, err = util.DownloadFile(fileUrl) + filename, header, resp, err = util.DownloadFile(fileUrl) - return filename, header, readCloser, err + return filename, header, resp, err } var _ = filer_pb.FilerClient(&FilerSource{}) |
