aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/diff_volume_servers/diff_volume_servers.go4
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go2
-rw-r--r--unmaintained/stream_read_volume/stream_read_volume.go11
-rw-r--r--weed/command/filer_cat.go2
-rw-r--r--weed/command/filer_copy.go14
-rw-r--r--weed/command/filer_meta_backup.go4
-rw-r--r--weed/command/filer_remote_gateway.go6
-rw-r--r--weed/command/filer_remote_sync.go4
-rw-r--r--weed/command/filer_remote_sync_dir.go2
-rw-r--r--weed/command/filer_sync.go4
-rw-r--r--weed/command/iam.go2
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/command/mount_std.go2
-rw-r--r--weed/command/msg_broker.go2
-rw-r--r--weed/command/s3.go2
-rw-r--r--weed/command/upload.go2
-rw-r--r--weed/command/webdav.go2
-rw-r--r--weed/filer/filer.go4
-rw-r--r--weed/filer/filer_conf.go2
-rw-r--r--weed/filer/filer_delete_entry.go2
-rw-r--r--weed/filer/meta_aggregator.go6
-rw-r--r--weed/filer/read_remote.go2
-rw-r--r--weed/filer/reader_at.go2
-rw-r--r--weed/filer/remote_mapping.go10
-rw-r--r--weed/filer/remote_storage.go2
-rw-r--r--weed/filesys/dir.go6
-rw-r--r--weed/filesys/dir_link.go4
-rw-r--r--weed/filesys/dir_rename.go2
-rw-r--r--weed/filesys/file.go4
-rw-r--r--weed/filesys/filehandle.go2
-rw-r--r--weed/filesys/wfs.go2
-rw-r--r--weed/filesys/wfs_filer_client.go4
-rw-r--r--weed/filesys/wfs_write.go2
-rw-r--r--weed/iamapi/iamapi_server.go42
-rw-r--r--weed/messaging/broker/broker_append.go8
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go6
-rw-r--r--weed/messaging/broker/broker_server.go10
-rw-r--r--weed/operation/assign_file_id.go4
-rw-r--r--weed/operation/delete_content.go2
-rw-r--r--weed/operation/grpc_client.go8
-rw-r--r--weed/operation/lookup.go2
-rw-r--r--weed/operation/sync_volume.go2
-rw-r--r--weed/operation/tail_volume.go2
-rw-r--r--weed/pb/filer_pb/filer_client.go18
-rw-r--r--weed/pb/filer_pb_tail.go4
-rw-r--r--weed/pb/grpc_client_server.go73
-rw-r--r--weed/remote_storage/track_sync_offset.go4
-rw-r--r--weed/replication/replicator.go2
-rw-r--r--weed/replication/sink/filersink/fetch_write.go6
-rw-r--r--weed/replication/sink/filersink/filer_sink.go6
-rw-r--r--weed/replication/source/filer_source.go6
-rw-r--r--weed/s3api/auth_credentials.go2
-rw-r--r--weed/s3api/filer_util.go2
-rw-r--r--weed/s3api/filer_util_tags.go6
-rw-r--r--weed/s3api/s3api_bucket_handlers.go4
-rw-r--r--weed/s3api/s3api_handlers.go4
-rw-r--r--weed/s3api/s3api_object_handlers.go2
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go2
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/filer_grpc_server_remote.go2
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/master_grpc_server_collection.go4
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/volume_grpc_client_to_master.go2
-rw-r--r--weed/server/volume_grpc_copy.go4
-rw-r--r--weed/server/volume_grpc_erasure_coding.go2
-rw-r--r--weed/server/webdav_server.go14
-rw-r--r--weed/shell/command_cluster_ps.go2
-rw-r--r--weed/shell/command_collection_delete.go2
-rw-r--r--weed/shell/command_collection_list.go2
-rw-r--r--weed/shell/command_ec_common.go8
-rw-r--r--weed/shell/command_ec_decode.go10
-rw-r--r--weed/shell/command_ec_encode.go2
-rw-r--r--weed/shell/command_ec_rebuild.go4
-rw-r--r--weed/shell/command_fs_cat.go2
-rw-r--r--weed/shell/command_fs_configure.go2
-rw-r--r--weed/shell/command_fs_meta_cat.go2
-rw-r--r--weed/shell/command_fs_meta_load.go2
-rw-r--r--weed/shell/command_fs_mkdir.go2
-rw-r--r--weed/shell/command_fs_mv.go2
-rw-r--r--weed/shell/command_fs_rm.go2
-rw-r--r--weed/shell/command_remote_configure.go4
-rw-r--r--weed/shell/command_remote_meta_sync.go2
-rw-r--r--weed/shell/command_remote_mount.go2
-rw-r--r--weed/shell/command_remote_uncache.go2
-rw-r--r--weed/shell/command_remote_unmount.go2
-rw-r--r--weed/shell/command_s3_bucket_create.go2
-rw-r--r--weed/shell/command_s3_bucket_delete.go2
-rw-r--r--weed/shell/command_s3_bucket_list.go2
-rw-r--r--weed/shell/command_s3_configure.go4
-rw-r--r--weed/shell/command_volume_check_disk.go6
-rw-r--r--weed/shell/command_volume_configure_replication.go2
-rw-r--r--weed/shell/command_volume_fix_replication.go2
-rw-r--r--weed/shell/command_volume_fsck.go2
-rw-r--r--weed/shell/command_volume_mount.go2
-rw-r--r--weed/shell/command_volume_move.go12
-rw-r--r--weed/shell/command_volume_server_leave.go2
-rw-r--r--weed/shell/command_volume_tier_download.go2
-rw-r--r--weed/shell/command_volume_tier_upload.go2
-rw-r--r--weed/shell/command_volume_unmount.go2
-rw-r--r--weed/shell/command_volume_vacuum.go2
-rw-r--r--weed/shell/commands.go4
-rw-r--r--weed/shell/shell_liner.go4
-rw-r--r--weed/storage/store_ec.go4
-rw-r--r--weed/storage/store_ec_delete.go2
-rw-r--r--weed/storage/volume_backup.go2
-rw-r--r--weed/topology/allocate_volume.go2
-rw-r--r--weed/topology/topology_vacuum.go8
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go6
-rw-r--r--weed/wdclient/masterclient.go8
110 files changed, 268 insertions, 262 deletions
diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go
index e8361a6cf..0188d18d4 100644
--- a/unmaintained/diff_volume_servers/diff_volume_servers.go
+++ b/unmaintained/diff_volume_servers/diff_volume_servers.go
@@ -122,7 +122,7 @@ type needleState struct {
func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleState, int64, error) {
var idxFile *bytes.Reader
- err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
@@ -180,7 +180,7 @@ func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleS
func getNeedleFileId(v uint32, nid types.NeedleId, addr pb.ServerAddress) (string, error) {
var id string
- err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: v,
NeedleId: uint64(nid),
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
index 8ccad1e49..faaea9e8d 100644
--- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -51,7 +51,7 @@ func main() {
}
func startGenerateMetadata() {
- pb.WithFilerClient(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
+ pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
for i := 0; i < *n; i++ {
name := fmt.Sprintf("file%d", i)
diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go
index e120b9920..bbe5abedb 100644
--- a/unmaintained/stream_read_volume/stream_read_volume.go
+++ b/unmaintained/stream_read_volume/stream_read_volume.go
@@ -15,9 +15,9 @@ import (
)
var (
- volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server")
- volumeId = flag.Int("volumeId", -1, "a volume id to stream read")
- grpcDialOption grpc.DialOption
+ volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server")
+ volumeId = flag.Int("volumeId", -1, "a volume id to stream read")
+ grpcDialOption grpc.DialOption
)
func main() {
@@ -33,11 +33,11 @@ func main() {
return nil
}
- err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{
- VolumeIds: []uint32{vid},
+ VolumeIds: []uint32{vid},
})
if err != nil {
return err
@@ -61,4 +61,3 @@ func main() {
}
}
-
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index 71c3a48d6..7f613f72b 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -97,7 +97,7 @@ func runFilerCat(cmd *Command, args []string) bool {
writer = f
}
- pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 8a8701828..88ae55e84 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -172,7 +172,7 @@ func runCopy(cmd *Command, args []string) bool {
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@@ -302,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi
return
}
- err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
@@ -344,7 +344,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
err = util.Retry("upload", func() error {
// assign a volume
- assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -404,7 +404,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
- if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -461,7 +461,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
@@ -540,7 +540,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("create manifest: %v", manifestErr)
}
- if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -598,7 +598,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
var fileId, host string
var auth security.EncodedJwt
- if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 0b8fa76c6..d52ed3349 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -222,9 +222,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err
var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
-func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go
index 9426f3841..fa0239558 100644
--- a/weed/command/filer_remote_gateway.go
+++ b/weed/command/filer_remote_gateway.go
@@ -32,8 +32,8 @@ type RemoteGatewayOptions struct {
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
-func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
@@ -87,7 +87,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool {
remoteGatewayOptions.bucketsDir = "/buckets"
// check buckets again
- remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index bceeb097e..681ea35e9 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -22,8 +22,8 @@ type RemoteSyncOptions struct {
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
-func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 30782942e..947f526bb 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -227,7 +227,7 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool {
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
entry.RemoteEntry = remoteEntry
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 20755dbe5..230b24a52 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -183,7 +183,7 @@ const (
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
- readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
@@ -209,7 +209,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
- return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
diff --git a/weed/command/iam.go b/weed/command/iam.go
index ebe9657f2..8fb14be06 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -48,7 +48,7 @@ func (iamopt *IamOptions) startIamServer() bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index 95f1c80b8..6d7aa2848 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -87,7 +87,7 @@ func startMasterFollower(masterOptions MasterOptions) {
var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
for i := 0; i < 10; i++ {
- err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master grpc address %v configuration: %v", masters, err)
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 2603260a2..ce9a998f6 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -78,7 +78,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
var cipher bool
var err error
for i := 0; i < 10; i++ {
- err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 61517ab39..35d59ea20 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -68,7 +68,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
cipher := false
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index d7cd7818d..ee726fcec 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -153,7 +153,7 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index f46e70cb1..f2b0b7fe4 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -130,7 +130,7 @@ func runUpload(cmd *Command, args []string) bool {
}
func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) {
- err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index bf4609d63..319302175 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -85,7 +85,7 @@ func (wo *WebDavOption) startWebDav() bool {
var cipher bool
// connect to filer
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 9eabbc337..7ca198b38 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -79,9 +79,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress) {
}
-func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){
+func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
- if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 1229f0139..45c368b9b 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -32,7 +32,7 @@ type FilerConf struct {
func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) {
var buf bytes.Buffer
- if err := pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if masterClient != nil {
return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf)
} else {
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index cc33811aa..bda69b15f 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -139,7 +139,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
func (f *Filer) doDeleteCollection(collectionName string) (err error) {
- return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index bb2c947e5..282668146 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -72,7 +72,7 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
delete(ma.peerStatues, address)
}
}
-func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) {
+func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
_, isActive = ma.peerStatues[address]
@@ -152,7 +152,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
for {
glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
- err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
@@ -194,7 +194,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
}
func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
- err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
index a3fb48ae0..6372dac72 100644
--- a/weed/filer/read_remote.go
+++ b/weed/filer/read_remote.go
@@ -26,7 +26,7 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot
}
func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: string(parent),
Name: entry.Name,
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 68594cb03..b73526761 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -44,7 +44,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
if !found {
util.Retry("lookup volume "+vid, func() error {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go
index c95c4e5bd..b0534e2ca 100644
--- a/weed/filer/remote_mapping.go
+++ b/weed/filer/remote_mapping.go
@@ -11,7 +11,7 @@ import (
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return readErr
}); readErr != nil {
@@ -30,7 +30,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor
// read current mapping
var oldContent, newContent []byte
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return err
})
@@ -47,7 +47,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor
}
// save back
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
})
if err != nil {
@@ -61,7 +61,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error
// read current mapping
var oldContent, newContent []byte
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return err
})
@@ -78,7 +78,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error
}
// save back
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
})
if err != nil {
diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go
index 9d682b698..5362ba738 100644
--- a/weed/filer/remote_storage.go
+++ b/weed/filer/remote_storage.go
@@ -133,7 +133,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index cedcf2d76..9d935e53c 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -201,7 +201,7 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex
}
glog.V(1).Infof("create %s/%s", dirFullPath, name)
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -242,7 +242,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
dirFullPath := dir.FullPath()
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(newEntry)
defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
@@ -566,7 +566,7 @@ func (dir *Dir) saveEntry(entry *filer_pb.Entry) error {
parentDir, name := util.FullPath(dir.FullPath()).DirAndName()
- return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(entry)
defer dir.wfs.mapPbIdFromFilerToLocal(entry)
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index acdcd2de4..68f5a79e2 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -68,7 +68,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
}
// apply changes to the filer, and also apply to local metaCache
- err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -121,7 +121,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
Signatures: []int32{dir.wfs.signature},
}
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index 1ee6922d8..d8ea3e459 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -22,7 +22,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
// update remote filer
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 767841f9d..bbc5c8e21 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -331,7 +331,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
}
func (file *File) saveEntry(entry *filer_pb.Entry) error {
- return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
file.wfs.mapPbIdFromLocalToFiler(entry)
defer file.wfs.mapPbIdFromFilerToLocal(entry)
@@ -362,7 +362,7 @@ func (file *File) getEntry() *filer_pb.Entry {
}
func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
- err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: file.dir.FullPath(),
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 2aa4de6da..607b901ff 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -277,7 +277,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
return nil
}
- err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := fh.f.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
entry := fh.f.getEntry()
if entry == nil {
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index aa4f9dacd..127c160c4 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -197,7 +197,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index cce6aa1a1..4feef867e 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -11,7 +11,7 @@ import (
var _ = filer_pb.FilerClient(&WFS{})
-func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
return util.Retry("filer grpc", func() error {
@@ -20,7 +20,7 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err
for x := 0; x < n; x++ {
filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress()
- err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, wfs.option.GrpcDialOption)
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 61a463e88..17489547c 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -19,7 +19,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
var fileId, host string
var auth security.EncodedJwt
- if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go
index ad1a8c4a2..fc0e6b700 100644
--- a/weed/iamapi/iamapi_server.go
+++ b/weed/iamapi/iamapi_server.go
@@ -76,7 +76,7 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) {
func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
var buf bytes.Buffer
- err = pb.WithGrpcFilerClient(iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf); err != nil {
return err
}
@@ -98,24 +98,20 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
if err := filer.ProtoToText(&buf, s3cfg); err != nil {
return fmt.Errorf("ProtoToText: %s", err)
}
- return pb.WithGrpcFilerClient(
- iam.option.Filer,
- iam.option.GrpcDialOption,
- func(client filer_pb.SeaweedFilerClient) error {
- err = util.Retry("saveIamIdentity", func() error {
- return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
- })
- if err != nil {
- return err
- }
- return nil
- },
- )
+ return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = util.Retry("saveIamIdentity", func() error {
+ return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
+ })
+ if err != nil {
+ return err
+ }
+ return nil
+ })
}
func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
var buf bytes.Buffer
- err = pb.WithGrpcFilerClient(iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamPoliciesFile, &buf); err != nil {
return err
}
@@ -139,14 +135,10 @@ func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
if b, err = json.Marshal(policies); err != nil {
return err
}
- return pb.WithGrpcFilerClient(
- iam.option.Filer,
- iam.option.GrpcDialOption,
- func(client filer_pb.SeaweedFilerClient) error {
- if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil {
- return err
- }
- return nil
- },
- )
+ return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil {
+ return err
+ }
+ return nil
+ })
}
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 9958a0752..9a31a8ac0 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
dir, name := util.FullPath(targetFile).DirAndName()
// append the chunk
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AppendToEntryRequest{
Directory: dir,
@@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var assignResult = &operation.AssignResult{}
// assign a volume location
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
assignErr := util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
@@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var _ = filer_pb.FilerClient(&MessageBroker{})
-func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
if err == io.EOF {
return
}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
index 66821d404..5cd8edd33 100644
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -34,7 +34,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
Resource: targetTopicPartition,
})
@@ -68,7 +68,7 @@ func (broker *MessageBroker) checkFilers() {
found := false
for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
@@ -93,7 +93,7 @@ func (broker *MessageBroker) checkFilers() {
found = false
for !found {
for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 193c1c689..acf2d6d34 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
for _, filer := range broker.option.Filers {
- broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.KeepConnected(ctx)
@@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
}
-func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+ return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
}
-func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
+func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
- return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index f0f7581f3..b716300e2 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -48,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
@@ -105,7 +105,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
- WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index d762f51e1..996c0b29e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -123,7 +123,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 743682203..9b68d2286 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -8,18 +8,18 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
-func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
+func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, volumeServer.ToGrpcAddress(), grpcDialOption)
}
-func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
+func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterServer.ToGrpcAddress(), grpcDialOption)
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index c222dff92..1eb5dd320 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -79,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
//only query unknown_vids
- err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeOrFileIds: unknown_vids,
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index fdd22ac85..de71a198d 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -9,7 +9,7 @@ import (
func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
VolumeId: vid,
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index bedeeb3b5..d3449873b 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -28,7 +28,7 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle
}
func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
- return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 719039d88..266bdae51 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -20,7 +20,7 @@ var (
)
type FilerClient interface {
- WithFilerClient(fn func(SeaweedFilerClient) error) error
+ WithFilerClient(streamingMode bool, fn func(SeaweedFilerClient) error) error
AdjustedUrl(location *Location) string
}
@@ -28,7 +28,7 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
dir, name := fullFilePath.DirAndName()
- err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
request := &LookupDirectoryEntryRequest{
Directory: dir,
@@ -83,13 +83,13 @@ func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefi
}
func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
- return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
})
}
func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
- return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
})
}
@@ -157,7 +157,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
- err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
request := &LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
@@ -185,7 +185,7 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin
func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) {
- return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
request := &UpdateEntryRequest{
Directory: parentDirectoryPath,
@@ -204,7 +204,7 @@ func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string
}
func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
- return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
return DoMkdir(client, parentDirectoryPath, dirName, fn)
})
}
@@ -241,7 +241,7 @@ func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName stri
}
func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error {
- return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
entry := &Entry{
Name: fileName,
@@ -276,7 +276,7 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string
}
func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
- return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
return DoRemove(client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures)
})
}
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go
index 9909e035b..1db9883a4 100644
--- a/weed/pb/filer_pb_tail.go
+++ b/weed/pb/filer_pb_tail.go
@@ -16,7 +16,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption,
pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
- err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName,
+ err := WithFilerClient(true, filerAddress, grpcDialOption, makeFunc(clientName,
pathPrefix, additionalPathPrefixes, &lastTsNs, selfSignature, processEventFn, fatalOnError))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
@@ -28,7 +28,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
clientName string, pathPrefix string, lastTsNs *int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
- err := filerClient.WithFilerClient(makeFunc(clientName,
+ err := filerClient.WithFilerClient(true, makeFunc(clientName,
pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 08fb7fac7..532c66932 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -97,7 +97,8 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG
return existingConnection, nil
}
- grpcConnection, err := GrpcDial(context.Background(), address, opts...)
+ ctx := context.Background()
+ grpcConnection, err := GrpcDial(ctx, address, opts...)
if err != nil {
return nil, fmt.Errorf("fail to dial %s: %v", address, err)
}
@@ -112,28 +113,42 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG
return vgc, nil
}
-func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
+// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
+func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
- vgc, err := getOrCreateConnection(address, opts...)
- if err != nil {
- return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
- }
- executionErr := fn(vgc.ClientConn)
- if executionErr != nil {
- if strings.Contains(executionErr.Error(), "transport") ||
- strings.Contains(executionErr.Error(), "connection closed") {
- grpcClientsLock.Lock()
- if t, ok := grpcClients[address]; ok {
- if t.version == vgc.version {
- vgc.Close()
- delete(grpcClients, address)
+ if !streamingMode {
+ vgc, err := getOrCreateConnection(address, opts...)
+ if err != nil {
+ return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
+ }
+ executionErr := fn(vgc.ClientConn)
+ if executionErr != nil {
+ if strings.Contains(executionErr.Error(), "transport") ||
+ strings.Contains(executionErr.Error(), "connection closed") {
+ grpcClientsLock.Lock()
+ if t, ok := grpcClients[address]; ok {
+ if t.version == vgc.version {
+ vgc.Close()
+ delete(grpcClients, address)
+ }
}
+ grpcClientsLock.Unlock()
}
- grpcClientsLock.Unlock()
}
+ return executionErr
+ } else {
+ grpcConnection, err := GrpcDial(context.Background(), address, opts...)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", address, err)
+ }
+ defer grpcConnection.Close()
+ executionErr := fn(grpcConnection)
+ if executionErr != nil {
+ return executionErr
+ }
+ return nil
}
- return executionErr
}
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
@@ -184,18 +199,18 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
return util.JoinHostPort(host, port)
}
-func WithMasterClient(master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
- return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, master.ToGrpcAddress(), grpcDialOption)
}
-func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
+func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses {
- err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress.ToGrpcAddress(), grpcDialOption)
@@ -207,34 +222,34 @@ func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOpt
return err
}
-func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
- return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, brokerGrpcAddress, grpcDialOption)
}
-func WithFilerClient(filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
+func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
- return WithGrpcFilerClient(filer, grpcDialOption, fn)
+ return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn)
}
-func WithGrpcFilerClient(filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
+func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
- return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress.ToGrpcAddress(), grpcDialOption)
}
-func WithOneOfGrpcFilerClients(filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
+func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
for _, filerAddress := range filerAddresses {
- err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerAddress.ToGrpcAddress(), grpcDialOption)
diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go
index 439491d12..25ac0d340 100644
--- a/weed/remote_storage/track_sync_offset.go
+++ b/weed/remote_storage/track_sync_offset.go
@@ -17,7 +17,7 @@ func GetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s
dirHash := uint32(util.HashStringToLong(dir))
- readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
@@ -46,7 +46,7 @@ func SetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s
dirHash := uint32(util.HashStringToLong(dir))
- return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 3e100497f..eaab2c13e 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -83,7 +83,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
}
func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) {
- if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil {
return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err)
} else {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 4c536b71c..825d2af95 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -70,7 +70,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
var host string
var auth security.EncodedJwt
- if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -131,9 +131,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
var _ = filer_pb.FilerClient(&FilerSink{})
-func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index d42ca63a8..c48ab2368 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -100,7 +100,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo
func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(key).DirAndName()
@@ -156,7 +156,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// read existing entry
var existingEntry *filer_pb.Entry
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
@@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}
// save updated meta data
- return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: newParentPath,
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 60c33463f..4108f3821 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -56,7 +56,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
vid := volumeId(part)
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
@@ -118,9 +118,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
var _ = filer_pb.FilerClient(&FilerSource{})
-func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index 0d46ad7ca..87d478136 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -84,7 +84,7 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) {
var content []byte
- err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile)
return err
})
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 888003e45..d227c609e 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -41,7 +41,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err != nil {
diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go
index 75d3b37d0..e45230165 100644
--- a/weed/s3api/filer_util_tags.go
+++ b/weed/s3api/filer_util_tags.go
@@ -13,7 +13,7 @@ const (
func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) {
- err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
@@ -35,7 +35,7 @@ func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (t
func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
@@ -71,7 +71,7 @@ func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, ta
func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 247e33104..c3e50ec44 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -83,7 +83,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
// avoid duplicated buckets
errCode := s3err.ErrNone
- if err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
IncludeEcVolumes: true,
IncludeNormalVolumes: true,
@@ -146,7 +146,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
return
}
- err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index e42fb6c44..4ace4bb21 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -13,9 +13,9 @@ import (
var _ = filer_pb.FilerClient(&S3ApiServer{})
-func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption)
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 2ac9c8102..d78332395 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -233,7 +233,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
- s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// delete file entries
for _, object := range deleteObjects.Objects {
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index 4decb5eac..a3b858dcb 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -146,7 +146,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
var nextMarker string
// check filer
- err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
if entry.IsDirectory {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a0385f487..8e6cd8451 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -327,7 +327,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect
glog.V(4).Infof("CollectionList %v", req)
resp = &filer_pb.CollectionListResponse{}
- err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: req.IncludeNormalVolumes,
IncludeEcVolumes: req.IncludeEcVolumes,
@@ -348,7 +348,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
glog.V(4).Infof("DeleteCollection %v", req)
- err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
})
@@ -362,7 +362,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
var output *master_pb.StatisticsResponse
- err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
Replication: req.Replication,
Collection: req.Collection,
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 7f31d8cc1..3be986023 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -123,7 +123,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
// tell filer to tell volume server to download into needles
assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
- err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index c9343a9bf..1a5f80369 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -165,7 +165,7 @@ func (fs *FilerServer) checkWithMaster() {
isConnected := false
for !isConnected {
for _, master := range fs.option.Masters {
- readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index 55f3faf8c..654da6b3c 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
@@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
for _, server := range listOfEcServers {
- err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 41a2b570b..72d4e20d7 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -27,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 2659307fc..f3f99ee7b 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -26,7 +26,7 @@ func (vs *VolumeServer) GetMaster() pb.ServerAddress {
func (vs *VolumeServer) checkWithMaster() (err error) {
for {
for _, master := range vs.SeedMasterNodes {
- err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 9d9f756ce..52181a771 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -45,7 +45,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
- err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
@@ -226,7 +226,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
if receiveErr == io.EOF {
break
}
- if resp!=nil && resp.ModifiedTsNs != 0 {
+ if resp != nil && resp.ModifiedTsNs != 0 {
modifiedTsNs = resp.ModifiedTsNs
}
if receiveErr != nil {
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 861d352d7..79611f499 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -126,7 +126,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
- err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 6c1de3154..018daed8b 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -120,9 +120,9 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
var _ = filer_pb.FilerClient(&WebDavFileSystem{})
-func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
@@ -162,7 +162,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
return os.ErrExist
}
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -212,7 +212,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
}
dir, name := util.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
@@ -315,7 +315,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
newDir, newBaseName := util.FullPath(newName).DirAndName()
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
@@ -375,7 +375,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
var fileId, host string
var auth security.EncodedJwt
- if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
@@ -477,7 +477,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.entry.Chunks = manifestedChunks
}
- flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
f.entry.Attributes.Collection = f.collection
f.entry.Attributes.Replication = f.replication
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go
index 5ed1677c8..2d391de1d 100644
--- a/weed/shell/command_cluster_ps.go
+++ b/weed/shell/command_cluster_ps.go
@@ -36,7 +36,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
return nil
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index 8942c15da..55c8ddf19 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -53,7 +53,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *collectionName,
})
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index ba502a6b9..b2aa6d2f4 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -62,7 +62,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 51c8c32cd..765c0ad89 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -61,7 +61,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
- err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetAddress != existingLocation {
@@ -241,7 +241,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
@@ -256,7 +256,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
@@ -269,7 +269,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index b2ca605c7..288fa4945 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -116,7 +116,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
- if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(vid),
})
@@ -149,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: uint32(vid),
Collection: collection,
@@ -187,7 +187,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
continue
}
- err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
@@ -223,7 +223,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
var resp *master_pb.LookupVolumeResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
return err
})
@@ -236,7 +236,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 6add14749..fcdee264e 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -126,7 +126,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 409ec4329..f5d1166d2 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -172,7 +172,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
- err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
VolumeId: uint32(volumeId),
Collection: collection,
@@ -213,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection
var copyErr error
if applyBalancing {
- copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index a5731240d..17e9c6550 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -41,7 +41,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write
dir, name := util.FullPath(path).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index ece805db3..73bb8e5c6 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -117,7 +117,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
}); err != nil && err != filer_pb.ErrNotFound {
return err
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index e0525defa..a7de6d3ef 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
dir, name := util.FullPath(path).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 46dc07e9a..7fe4cf809 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -48,7 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go
index 11b663eec..71a35cb68 100644
--- a/weed/shell/command_fs_mkdir.go
+++ b/weed/shell/command_fs_mkdir.go
@@ -36,7 +36,7 @@ func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Wri
dir, name := util.FullPath(path).DirAndName()
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: dir,
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index 2448c8f61..612cdc84e 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -54,7 +54,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
destinationDir, destinationName := util.FullPath(destinationPath).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// collect destination entry info
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go
index b383366ca..2ce3275bb 100644
--- a/weed/shell/command_fs_rm.go
+++ b/weed/shell/command_fs_rm.go
@@ -56,7 +56,7 @@ func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return fmt.Errorf("need to have arguments")
}
- commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
for _, entry := range entiries {
targetPath, err := commandEnv.parseUrl(entry)
if err != nil {
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index 3fa905237..c892c3443 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -184,7 +184,7 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE
func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error {
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: filer.DirectoryEtcRemote,
@@ -214,7 +214,7 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write
return err
}
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data)
}); err != nil && err != filer_pb.ErrNotFound {
return err
diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go
index 277c4c2be..e09a66761 100644
--- a/weed/shell/command_remote_meta_sync.go
+++ b/weed/shell/command_remote_meta_sync.go
@@ -117,7 +117,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util
remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
index 2b57db707..019bc93c9 100644
--- a/weed/shell/command_remote_mount.go
+++ b/weed/shell/command_remote_mount.go
@@ -117,7 +117,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error {
func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error {
// find existing directory, and ensure the directory is empty
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
_, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go
index 53d043ebf..a3433621e 100644
--- a/weed/shell/command_remote_uncache.go
+++ b/weed/shell/command_remote_uncache.go
@@ -105,7 +105,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer
fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name))
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: string(dir),
Entry: entry,
diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go
index c947a19e6..49b07f8f1 100644
--- a/weed/shell/command_remote_unmount.go
+++ b/weed/shell/command_remote_unmount.go
@@ -83,7 +83,7 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer
func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error {
// find existing directory, and ensure the directory is empty
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go
index a512ffc4a..35ecae4ee 100644
--- a/weed/shell/command_s3_bucket_create.go
+++ b/weed/shell/command_s3_bucket_create.go
@@ -45,7 +45,7 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("empty bucket name")
}
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 26953c249..5f8585b0b 100644
--- a/weed/shell/command_s3_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -52,7 +52,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
}
// delete the collection directly first
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *bucketName,
})
diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go
index 0c4e8d18f..a344a79a4 100644
--- a/weed/shell/command_s3_bucket_list.go
+++ b/weed/shell/command_s3_bucket_list.go
@@ -65,7 +65,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i
}
func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index 5eab2ebd0..cefb1deeb 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -48,7 +48,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
}
var buf bytes.Buffer
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf)
}); err != nil && err != filer_pb.ErrNotFound {
return err
@@ -171,7 +171,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
}); err != nil {
return err
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 643cccac3..cf3bc604a 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -182,7 +182,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
- err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@@ -200,7 +200,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.Serv
func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
- return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@@ -229,7 +229,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
- return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index 27cba618b..5c4c10146 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -86,7 +86,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 2885ba11f..43bf4d0f8 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -265,7 +265,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
break
}
- err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index a7a981339..7a7cdee56 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -248,7 +248,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
}
- return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
if vinfo.isEcVolume {
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 575051ffe..7b03b8dfa 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -55,7 +55,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
}
func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 796f74264..d7cc8b8ce 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -112,7 +112,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
return
}
- clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: uint32(volumeId),
})
@@ -123,7 +123,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
}
}()
- err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(volumeId),
})
@@ -140,7 +140,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
return
}
- err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
@@ -173,7 +173,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
- return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
@@ -186,7 +186,7 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
}
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})
@@ -195,7 +195,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
}
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: uint32(volumeId),
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
index 4daa589be..029ef2201 100644
--- a/weed/shell/command_volume_server_leave.go
+++ b/weed/shell/command_volume_server_leave.go
@@ -56,7 +56,7 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri
}
func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) {
- return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
if leaveErr != nil {
fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index 57d3bf347..a2330ab8a 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -124,7 +124,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error {
- err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index a22fe92a1..0df0790e6 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -118,7 +118,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index d5cb9f07c..85bec44f7 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -55,7 +55,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
}
func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go
index 2e09a8c1b..a09bf5d56 100644
--- a/weed/shell/command_volume_vacuum.go
+++ b/weed/shell/command_volume_vacuum.go
@@ -39,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
return
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
})
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 02c0af59e..985f6423b 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -97,9 +97,9 @@ func (ce *CommandEnv) checkDirectory(path string) error {
var _ = filer_pb.FilerClient(&CommandEnv{})
-func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
+ return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index caf8da859..0aa65f049 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -53,7 +53,7 @@ func RunShell(options ShellOptions) {
if commandEnv.option.FilerAddress == "" {
var filers []pb.ServerAddress
- commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
@@ -75,7 +75,7 @@ func RunShell(options ShellOptions) {
}
if commandEnv.option.FilerAddress != "" {
- commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 0e33a8e32..70e1593a0 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -239,7 +239,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume)
glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
- err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err = operation.WithMasterServerClient(false, s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupEcVolumeRequest{
VolumeId: uint32(ecVolume.VolumeId),
}
@@ -287,7 +287,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne
func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
- err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
index d40165ff5..398771a83 100644
--- a/weed/storage/store_ec_delete.go
+++ b/weed/storage/store_ec_delete.go
@@ -88,7 +88,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh
func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
- return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
_, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index a8ec50cad..67406aff6 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -73,7 +73,7 @@ func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption
writeOffset := int64(startFromOffset)
- err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{
VolumeId: uint32(v.Id),
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 83043c23f..e1ae30c2d 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -15,7 +15,7 @@ type AllocateVolumeResult struct {
func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
- return operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
VolumeId: uint32(vid),
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 40e3f70e7..74d70bcdb 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -22,7 +22,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
errCount := int32(0)
for index, dn := range locationlist.list {
go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
- err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: uint32(vid),
})
@@ -70,7 +70,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
for index, dn := range locationlist.list {
go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumeId: uint32(vid),
Preallocate: preallocate,
@@ -121,7 +121,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
isReadOnly := false
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
@@ -147,7 +147,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: uint32(vid),
})
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 725fa307d..1767ee4a4 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -54,7 +54,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
// retry to get the lease
for {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
@@ -82,7 +82,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
defer cancel2()
for l.isLocking {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
@@ -114,7 +114,7 @@ func (l *ExclusiveLocker) ReleaseLock() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 727d9cd34..672b3ac49 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -59,7 +59,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
if master == myMasterAddress {
continue
}
- if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
@@ -96,7 +96,7 @@ func (mc *MasterClient) tryAllMasters() {
func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
- gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -172,12 +172,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
return
}
-func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
+func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
for mc.currentMaster == "" {
time.Sleep(3 * time.Second)
}
- return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})