diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 6 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 6 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 6 |
4 files changed, 10 insertions, 10 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 3e100497f..eaab2c13e 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -83,7 +83,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p } func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) { - if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil { return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err) } else { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 4c536b71c..825d2af95 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -70,7 +70,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) var host string var auth security.EncodedJwt - if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -131,9 +131,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) var _ = filer_pb.FilerClient(&FilerSink{}) -func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index d42ca63a8..c48ab2368 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -100,7 +100,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(key).DirAndName() @@ -156,7 +156,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // read existing entry var existingEntry *filer_pb.Entry - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // save updated meta data - return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: newParentPath, diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 60c33463f..4108f3821 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -56,7 +56,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) vid := volumeId(part) - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, @@ -118,9 +118,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea var _ = filer_pb.FilerClient(&FilerSource{}) -func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) |
