aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-01-26 14:42:11 -0800
committerChris Lu <chris.lu@gmail.com>2020-01-26 14:42:11 -0800
commit72a64a5cf8c2a5adfe59665a746e013ca948e681 (patch)
treea0fd30cd09d5f6f5a4f8031818f1b12bf7b85f4f /weed/replication
parent0c298ef8906816b40b19db36be673af564af032a (diff)
downloadseaweedfs-72a64a5cf8c2a5adfe59665a746e013ca948e681.tar.xz
seaweedfs-72a64a5cf8c2a5adfe59665a746e013ca948e681.zip
use the same context object in order to retry
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/sink/filersink/fetch_write.go8
-rw-r--r--weed/replication/sink/filersink/filer_sink.go8
-rw-r--r--weed/replication/source/filer_source.go8
3 files changed, 12 insertions, 12 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 97e9671a3..26c055da5 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -63,7 +63,7 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi
var host string
var auth security.EncodedJwt
- if err := fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ if err := fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -104,11 +104,11 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi
return
}
-func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(ctx context.Context, grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
+ return fn(ctx, client)
}, fs.grpcAddress, fs.grpcDialOption)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 58a3ab9c2..4790d1562 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -64,7 +64,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
}
func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
- return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
dir, name := filer2.FullPath(key).DirAndName()
@@ -87,7 +87,7 @@ func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, d
func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
- return fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
dir, name := filer2.FullPath(key).DirAndName()
@@ -139,7 +139,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
// read existing entry
var existingEntry *filer_pb.Entry
- err = fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
@@ -191,7 +191,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
}
// save updated meta data
- return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return true, fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: newParentPath,
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index d7b5ebc4d..aef13be75 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -45,7 +45,7 @@ func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl s
vid := volumeId(part)
- err = fs.withFilerClient(ctx, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.withFilerClient(ctx, fs.grpcDialOption, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read lookup volume id locations: %v", vid)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
@@ -89,11 +89,11 @@ func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename stri
return filename, header, readCloser, err
}
-func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
+ return fn(ctx2, client)
}, fs.grpcAddress, fs.grpcDialOption)
}