diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 11 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 12 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 12 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 20 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 23 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 12 | ||||
| -rw-r--r-- | weed/replication/sink/replication_sink.go | 7 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 11 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 11 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 14 |
10 files changed, 62 insertions, 71 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 48a81a093..20c1d08b5 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -1,6 +1,7 @@ package replication import ( + "context" "path/filepath" "strings" @@ -29,7 +30,7 @@ func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSin } } -func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { +func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_pb.EventNotification) error { if !strings.HasPrefix(key, r.source.Dir) { glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) return nil @@ -39,23 +40,23 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) key = newKey if message.OldEntry != nil && message.NewEntry == nil { glog.V(4).Infof("deleting %v", key) - return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks) + return r.sink.DeleteEntry(ctx, key, message.OldEntry.IsDirectory, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { glog.V(4).Infof("creating %v", key) - return r.sink.CreateEntry(key, message.NewEntry) + return r.sink.CreateEntry(ctx, key, message.NewEntry) } if message.OldEntry == nil && message.NewEntry == nil { glog.V(0).Infof("weird message %+v", message) return nil } - foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) + foundExisting, err := r.sink.UpdateEntry(ctx, key, message.OldEntry, message.NewEntry, message.DeleteChunks) if foundExisting { glog.V(4).Infof("updated %v", key) return err } glog.V(4).Infof("creating missing %v", key) - return r.sink.CreateEntry(key, message.NewEntry) + return r.sink.CreateEntry(ctx, key, message.NewEntry) } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 7acf37fa5..760fbdbb5 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -70,7 +70,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e return nil } -func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (g *AzureSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -78,8 +78,6 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo key = key + "/" } - ctx := context.Background() - if _, err := g.containerURL.NewBlobURL(key).Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil { return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err) @@ -89,7 +87,7 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo } -func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -100,8 +98,6 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - ctx := context.Background() - // Create a URL that references a to-be-created blob in your // Azure Storage account's container. appendBlobURL := g.containerURL.NewAppendBlobURL(key) @@ -113,7 +109,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) if err != nil { return err } @@ -136,7 +132,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *AzureSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 17f5e39b2..c80bfcc49 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -58,7 +58,7 @@ func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error { return nil } -func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -66,8 +66,6 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) key = key + "/" } - ctx := context.Background() - bucket, err := g.client.Bucket(ctx, g.bucket) if err != nil { return err @@ -79,7 +77,7 @@ func (g *B2Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) } -func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -90,8 +88,6 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - ctx := context.Background() - bucket, err := g.client.Bucket(ctx, g.bucket) if err != nil { return err @@ -102,7 +98,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) if err != nil { return err } @@ -128,7 +124,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *B2Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index f1306ca4c..0f3473ff2 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -13,7 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -22,7 +22,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replic wg.Add(1) go func(chunk *filer_pb.FileChunk) { defer wg.Done() - replicatedChunk, e := fs.replicateOneChunk(chunk) + replicatedChunk, e := fs.replicateOneChunk(ctx, chunk) if e != nil { err = e } @@ -34,9 +34,9 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk) (replic return } -func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(sourceChunk) + fileId, err := fs.fetchAndWrite(ctx, sourceChunk) if err != nil { return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) } @@ -51,9 +51,9 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_ }, nil } -func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) { - filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.FileId) + filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.FileId) if err != nil { return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err) } @@ -62,7 +62,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri var host string var auth security.EncodedJwt - if err := fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -72,7 +72,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri DataCenter: fs.dataCenter, } - resp, err := client.AssignVolume(context.Background(), request) + resp, err := client.AssignVolume(ctx, request) if err != nil { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err @@ -103,9 +103,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri return } -func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(fs.grpcAddress, fs.grpcDialOption) + grpcConnection, err := util.GrpcDial(ctx, fs.grpcAddress, fs.grpcDialOption) if err != nil { return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 2eb326b83..777c28620 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -63,8 +63,8 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, return nil } -func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { +func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { + return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -75,7 +75,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo } glog.V(1).Infof("delete entry: %v", request) - _, err := client.DeleteEntry(context.Background(), request) + _, err := client.DeleteEntry(ctx, request) if err != nil { glog.V(0).Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) @@ -85,12 +85,11 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo }) } -func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() - ctx := context.Background() // look up existing entry lookupRequest := &filer_pb.LookupDirectoryEntryRequest{ @@ -105,7 +104,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { } } - replicatedChunks, err := fs.replicateChunks(entry.Chunks) + replicatedChunks, err := fs.replicateChunks(ctx, entry.Chunks) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) @@ -134,15 +133,13 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { - - ctx := context.Background() +func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { dir, name := filer2.FullPath(key).DirAndName() // read existing entry var existingEntry *filer_pb.Entry - err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -186,7 +183,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(newChunks) + replicatedChunks, err := fs.replicateChunks(ctx, newChunks) if err != nil { return true, fmt.Errorf("replicte %s chunks error: %v", key, err) } @@ -194,7 +191,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, } // save updated meta data - return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: dir, diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index c1beefc33..6b710a12a 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -69,13 +69,13 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str return nil } -func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (g *GcsSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { if isDirectory { key = key + "/" } - if err := g.client.Bucket(g.bucket).Object(key).Delete(context.Background()); err != nil { + if err := g.client.Bucket(g.bucket).Object(key).Delete(ctx); err != nil { return fmt.Errorf("gcs delete %s%s: %v", g.bucket, key, err) } @@ -83,7 +83,7 @@ func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) } -func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (g *GcsSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { if entry.IsDirectory { return nil @@ -92,13 +92,11 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - ctx := context.Background() - wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx) for _, chunk := range chunkViews { - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) + fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId) if err != nil { return err } @@ -121,7 +119,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } -func (g *GcsSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *GcsSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { // TODO improve efficiency return false, nil } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index 0a86139d3..984aebc58 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -1,6 +1,7 @@ package sink import ( + "context" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" @@ -9,9 +10,9 @@ import ( type ReplicationSink interface { GetName() string Initialize(configuration util.Configuration) error - DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error - CreateEntry(key string, entry *filer_pb.Entry) error - UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) + DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error + CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error + UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 0a4e78318..a5b52095c 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -1,6 +1,7 @@ package S3Sink import ( + "context" "fmt" "strings" "sync" @@ -76,7 +77,7 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, buc return nil } -func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { +func (s3sink *S3Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error { key = cleanKey(key) @@ -88,7 +89,7 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b } -func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { +func (s3sink *S3Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error { key = cleanKey(key) @@ -111,7 +112,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { wg.Add(1) go func(chunk *filer2.ChunkView) { defer wg.Done() - if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { + if part, uploadErr := s3sink.uploadPart(ctx, key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr } else { parts = append(parts, part) @@ -125,11 +126,11 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { return err } - return s3sink.completeMultipartUpload(key, uploadId, parts) + return s3sink.completeMultipartUpload(ctx, key, uploadId, parts) } -func (s3sink *S3Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (s3sink *S3Sink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 5c4be7aee..0a190b27d 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -2,6 +2,7 @@ package S3Sink import ( "bytes" + "context" "fmt" "io" @@ -81,7 +82,7 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error { } // To complete multipart upload -func (s3sink *S3Sink) completeMultipartUpload(key, uploadId string, parts []*s3.CompletedPart) error { +func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId string, parts []*s3.CompletedPart) error { input := &s3.CompleteMultipartUploadInput{ Bucket: aws.String(s3sink.bucket), Key: aws.String(key), @@ -102,10 +103,10 @@ func (s3sink *S3Sink) completeMultipartUpload(key, uploadId string, parts []*s3. } // To upload a part -func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) { +func (s3sink *S3Sink) uploadPart(ctx context.Context, key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) { var readSeeker io.ReadSeeker - readSeeker, err := s3sink.buildReadSeeker(chunk) + readSeeker, err := s3sink.buildReadSeeker(ctx, chunk) if err != nil { glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) @@ -155,8 +156,8 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou return err } -func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, error) { - fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId) +func (s3sink *S3Sink) buildReadSeeker(ctx context.Context, chunk *filer2.ChunkView) (io.ReadSeeker, error) { + fileUrl, err := s3sink.filerSource.LookupFileId(ctx, chunk.FileId) if err != nil { return nil, err } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 92c2d203d..3ab6c7261 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -39,16 +39,16 @@ func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { return nil } -func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { +func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) vid := volumeId(part) - err = fs.withFilerClient(fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read lookup volume id locations: %v", vid) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) if err != nil { @@ -77,9 +77,9 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { return } -func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { +func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { - fileUrl, err := fs.LookupFileId(part) + fileUrl, err := fs.LookupFileId(ctx, part) if err != nil { return "", nil, nil, err } @@ -89,9 +89,9 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade return filename, header, readCloser, err } -func (fs *FilerSource) withFilerClient(grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(fs.grpcAddress, grpcDialOption) + grpcConnection, err := util.GrpcDial(ctx, fs.grpcAddress, grpcDialOption) if err != nil { return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) } |
