aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/replicator.go2
-rw-r--r--weed/replication/sink/filersink/fetch_write.go6
-rw-r--r--weed/replication/sink/filersink/filer_sink.go6
-rw-r--r--weed/replication/source/filer_source.go6
4 files changed, 10 insertions, 10 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 3e100497f..eaab2c13e 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -83,7 +83,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
}
func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) {
- if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil {
return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err)
} else {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 4c536b71c..825d2af95 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -70,7 +70,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
var host string
var auth security.EncodedJwt
- if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -131,9 +131,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
var _ = filer_pb.FilerClient(&FilerSink{})
-func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index d42ca63a8..c48ab2368 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -100,7 +100,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo
func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(key).DirAndName()
@@ -156,7 +156,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// read existing entry
var existingEntry *filer_pb.Entry
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
@@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}
// save updated meta data
- return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: newParentPath,
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 60c33463f..4108f3821 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -56,7 +56,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
vid := volumeId(part)
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
@@ -118,9 +118,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
var _ = filer_pb.FilerClient(&FilerSource{})
-func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)