diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
| commit | 9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch) | |
| tree | 1e897171c804e63ba6edef4778ea8b243f2ad8d6 | |
| parent | c935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff) | |
| download | seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip | |
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
110 files changed, 268 insertions, 262 deletions
diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index e8361a6cf..0188d18d4 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -122,7 +122,7 @@ type needleState struct { func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleState, int64, error) { var idxFile *bytes.Reader - err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ @@ -180,7 +180,7 @@ func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleS func getNeedleFileId(v uint32, nid types.NeedleId, addr pb.ServerAddress) (string, error) { var id string - err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{ VolumeId: v, NeedleId: uint64(nid), diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 8ccad1e49..faaea9e8d 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -51,7 +51,7 @@ func main() { } func startGenerateMetadata() { - pb.WithFilerClient(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error { for i := 0; i < *n; i++ { name := fmt.Sprintf("file%d", i) diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go index e120b9920..bbe5abedb 100644 --- a/unmaintained/stream_read_volume/stream_read_volume.go +++ b/unmaintained/stream_read_volume/stream_read_volume.go @@ -15,9 +15,9 @@ import ( ) var ( - volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") - volumeId = flag.Int("volumeId", -1, "a volume id to stream read") - grpcDialOption grpc.DialOption + volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") + volumeId = flag.Int("volumeId", -1, "a volume id to stream read") + grpcDialOption grpc.DialOption ) func main() { @@ -33,11 +33,11 @@ func main() { return nil } - err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ - VolumeIds: []uint32{vid}, + VolumeIds: []uint32{vid}, }) if err != nil { return err @@ -61,4 +61,3 @@ func main() { } } - diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 71c3a48d6..7f613f72b 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -97,7 +97,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 8a8701828..88ae55e84 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -172,7 +172,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -302,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -344,7 +344,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err err = util.Retry("upload", func() error { // assign a volume - assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -404,7 +404,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -461,7 +461,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -540,7 +540,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -598,7 +598,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 0b8fa76c6..d52ed3349 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -222,9 +222,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) -func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 9426f3841..fa0239558 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -32,8 +32,8 @@ type RemoteGatewayOptions struct { var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) -func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } @@ -87,7 +87,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool { remoteGatewayOptions.bucketsDir = "/buckets" // check buckets again - remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index bceeb097e..681ea35e9 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -22,8 +22,8 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) -func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 30782942e..947f526bb 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -227,7 +227,7 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool { func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() entry.RemoteEntry = remoteEntry - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: dir, Entry: entry, diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 20755dbe5..230b24a52 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -183,7 +183,7 @@ const ( func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -209,7 +209,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) diff --git a/weed/command/iam.go b/weed/command/iam.go index ebe9657f2..8fb14be06 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -48,7 +48,7 @@ func (iamopt *IamOptions) startIamServer() bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index 95f1c80b8..6d7aa2848 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -87,7 +87,7 @@ func startMasterFollower(masterOptions MasterOptions) { var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master grpc address %v configuration: %v", masters, err) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2603260a2..ce9a998f6 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -78,7 +78,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { var cipher bool var err error for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 61517ab39..35d59ea20 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -68,7 +68,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { cipher := false for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/s3.go b/weed/command/s3.go index d7cd7818d..ee726fcec 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -153,7 +153,7 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/upload.go b/weed/command/upload.go index f46e70cb1..f2b0b7fe4 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -130,7 +130,7 @@ func runUpload(cmd *Command, args []string) bool { } func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { - err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index bf4609d63..319302175 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -85,7 +85,7 @@ func (wo *WebDavOption) startWebDav() bool { var cipher bool // connect to filer for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 9eabbc337..7ca198b38 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -79,9 +79,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress) { } -func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){ +func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { - if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 1229f0139..45c368b9b 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -32,7 +32,7 @@ type FilerConf struct { func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) { var buf bytes.Buffer - if err := pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if masterClient != nil { return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf) } else { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index cc33811aa..bda69b15f 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -139,7 +139,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou func (f *Filer) doDeleteCollection(collectionName string) (err error) { - return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: collectionName, }) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index bb2c947e5..282668146 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -72,7 +72,7 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { delete(ma.peerStatues, address) } } -func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) { +func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() _, isActive = ma.peerStatues[address] @@ -152,7 +152,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p for { glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) - err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ @@ -194,7 +194,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p } func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) { - err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index a3fb48ae0..6372dac72 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -26,7 +26,7 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot } func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: string(parent), Name: entry.Name, diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 68594cb03..b73526761 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -44,7 +44,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp if !found { util.Retry("lookup volume "+vid, func() error { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go index c95c4e5bd..b0534e2ca 100644 --- a/weed/filer/remote_mapping.go +++ b/weed/filer/remote_mapping.go @@ -11,7 +11,7 @@ import ( func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return readErr }); readErr != nil { @@ -30,7 +30,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor // read current mapping var oldContent, newContent []byte - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return err }) @@ -47,7 +47,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor } // save back - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) }) if err != nil { @@ -61,7 +61,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error // read current mapping var oldContent, newContent []byte - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return err }) @@ -78,7 +78,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error } // save back - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent) }) if err != nil { diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go index 9d682b698..5362ba738 100644 --- a/weed/filer/remote_storage.go +++ b/weed/filer/remote_storage.go @@ -133,7 +133,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte - if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) return readErr }); readErr != nil { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index cedcf2d76..9d935e53c 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -201,7 +201,7 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex } glog.V(1).Infof("create %s/%s", dirFullPath, name) - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -242,7 +242,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err dirFullPath := dir.FullPath() - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(newEntry) defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) @@ -566,7 +566,7 @@ func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(entry) defer dir.wfs.mapPbIdFromFilerToLocal(entry) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index acdcd2de4..68f5a79e2 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -68,7 +68,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f } // apply changes to the filer, and also apply to local metaCache - err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -121,7 +121,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, Signatures: []int32{dir.wfs.signature}, } - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 1ee6922d8..d8ea3e459 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -22,7 +22,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) // update remote filer - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 767841f9d..bbc5c8e21 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -331,7 +331,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { } func (file *File) saveEntry(entry *filer_pb.Entry) error { - return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { file.wfs.mapPbIdFromLocalToFiler(entry) defer file.wfs.mapPbIdFromFilerToLocal(entry) @@ -362,7 +362,7 @@ func (file *File) getEntry() *filer_pb.Entry { } func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { - err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: file.dir.FullPath(), diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 2aa4de6da..607b901ff 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -277,7 +277,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { return nil } - err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { entry := fh.f.getEntry() if entry == nil { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index aa4f9dacd..127c160c4 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -197,7 +197,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index cce6aa1a1..4feef867e 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -11,7 +11,7 @@ import ( var _ = filer_pb.FilerClient(&WFS{}) -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { return util.Retry("filer grpc", func() error { @@ -20,7 +20,7 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err for x := 0; x < n; x++ { filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress() - err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress, wfs.option.GrpcDialOption) diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 61a463e88..17489547c 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -19,7 +19,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun var fileId, host string var auth security.EncodedJwt - if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index ad1a8c4a2..fc0e6b700 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -76,7 +76,7 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) { func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf); err != nil { return err } @@ -98,24 +98,20 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat if err := filer.ProtoToText(&buf, s3cfg); err != nil { return fmt.Errorf("ProtoToText: %s", err) } - return pb.WithGrpcFilerClient( - iam.option.Filer, - iam.option.GrpcDialOption, - func(client filer_pb.SeaweedFilerClient) error { - err = util.Retry("saveIamIdentity", func() error { - return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) - }) - if err != nil { - return err - } - return nil - }, - ) + return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = util.Retry("saveIamIdentity", func() error { + return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) + }) + if err != nil { + return err + } + return nil + }) } func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirecotry, filer.IamPoliciesFile, &buf); err != nil { return err } @@ -139,14 +135,10 @@ func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { if b, err = json.Marshal(policies); err != nil { return err } - return pb.WithGrpcFilerClient( - iam.option.Filer, - iam.option.GrpcDialOption, - func(client filer_pb.SeaweedFilerClient) error { - if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil { - return err - } - return nil - }, - ) + return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamPoliciesFile, b); err != nil { + return err + } + return nil + }) } diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 9958a0752..9a31a8ac0 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag dir, name := util.FullPath(targetFile).DirAndName() // append the chunk - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AppendToEntryRequest{ Directory: dir, @@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var assignResult = &operation.AssignResult{} // assign a volume location - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { assignErr := util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ @@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var _ = filer_pb.FilerClient(&MessageBroker{}) -func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { for _, filer := range broker.option.Filers { - if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { if err == io.EOF { return } diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go index 66821d404..5cd8edd33 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -34,7 +34,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ Resource: targetTopicPartition, }) @@ -68,7 +68,7 @@ func (broker *MessageBroker) checkFilers() { found := false for !found { for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err @@ -93,7 +93,7 @@ func (broker *MessageBroker) checkFilers() { found = false for !found { for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 193c1c689..acf2d6d34 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { for { for _, filer := range broker.option.Filers { - broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.KeepConnected(ctx) @@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { } -func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(filer, broker.grpcDialOption, fn) + return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) } -func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { +func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index f0f7581f3..b716300e2 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -48,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, @@ -105,7 +105,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { - WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index d762f51e1..996c0b29e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -123,7 +123,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 743682203..9b68d2286 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -8,18 +8,18 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, volumeServer.ToGrpcAddress(), grpcDialOption) } -func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { +func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterServer.ToGrpcAddress(), grpcDialOption) diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index c222dff92..1eb5dd320 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -79,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids //only query unknown_vids - err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeOrFileIds: unknown_vids, diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index fdd22ac85..de71a198d 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -9,7 +9,7 @@ import ( func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index bedeeb3b5..d3449873b 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -28,7 +28,7 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle } func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 719039d88..266bdae51 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -20,7 +20,7 @@ var ( ) type FilerClient interface { - WithFilerClient(fn func(SeaweedFilerClient) error) error + WithFilerClient(streamingMode bool, fn func(SeaweedFilerClient) error) error AdjustedUrl(location *Location) string } @@ -28,7 +28,7 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry dir, name := fullFilePath.DirAndName() - err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { request := &LookupDirectoryEntryRequest{ Directory: dir, @@ -83,13 +83,13 @@ func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefi } func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit) }) } func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit) }) } @@ -157,7 +157,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) { - err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { request := &LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -185,7 +185,7 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { request := &UpdateEntryRequest{ Directory: parentDirectoryPath, @@ -204,7 +204,7 @@ func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string } func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return DoMkdir(client, parentDirectoryPath, dirName, fn) }) } @@ -241,7 +241,7 @@ func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName stri } func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { entry := &Entry{ Name: fileName, @@ -276,7 +276,7 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string } func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error { - return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error { return DoRemove(client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures) }) } diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 9909e035b..1db9883a4 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -16,7 +16,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) error { - err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName, + err := WithFilerClient(true, filerAddress, grpcDialOption, makeFunc(clientName, pathPrefix, additionalPathPrefixes, &lastTsNs, selfSignature, processEventFn, fatalOnError)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) @@ -28,7 +28,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, pathPrefix string, lastTsNs *int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) error { - err := filerClient.WithFilerClient(makeFunc(clientName, + err := filerClient.WithFilerClient(true, makeFunc(clientName, pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError)) if err != nil { return fmt.Errorf("subscribing filer meta change: %v", err) diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 08fb7fac7..532c66932 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -97,7 +97,8 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return existingConnection, nil } - grpcConnection, err := GrpcDial(context.Background(), address, opts...) + ctx := context.Background() + grpcConnection, err := GrpcDial(ctx, address, opts...) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", address, err) } @@ -112,28 +113,42 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG return vgc, nil } -func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. +func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - vgc, err := getOrCreateConnection(address, opts...) - if err != nil { - return fmt.Errorf("getOrCreateConnection %s: %v", address, err) - } - executionErr := fn(vgc.ClientConn) - if executionErr != nil { - if strings.Contains(executionErr.Error(), "transport") || - strings.Contains(executionErr.Error(), "connection closed") { - grpcClientsLock.Lock() - if t, ok := grpcClients[address]; ok { - if t.version == vgc.version { - vgc.Close() - delete(grpcClients, address) + if !streamingMode { + vgc, err := getOrCreateConnection(address, opts...) + if err != nil { + return fmt.Errorf("getOrCreateConnection %s: %v", address, err) + } + executionErr := fn(vgc.ClientConn) + if executionErr != nil { + if strings.Contains(executionErr.Error(), "transport") || + strings.Contains(executionErr.Error(), "connection closed") { + grpcClientsLock.Lock() + if t, ok := grpcClients[address]; ok { + if t.version == vgc.version { + vgc.Close() + delete(grpcClients, address) + } } + grpcClientsLock.Unlock() } - grpcClientsLock.Unlock() } + return executionErr + } else { + grpcConnection, err := GrpcDial(context.Background(), address, opts...) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", address, err) + } + defer grpcConnection.Close() + executionErr := fn(grpcConnection) + if executionErr != nil { + return executionErr + } + return nil } - return executionErr } func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) { @@ -184,18 +199,18 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { return util.JoinHostPort(host, port) } -func WithMasterClient(master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { +func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, master.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { +func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption) @@ -207,34 +222,34 @@ func WithOneOfGrpcMasterClients(masterGrpcAddresses []ServerAddress, grpcDialOpt return err } -func WithBrokerGrpcClient(brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { +func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := messaging_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) }, brokerGrpcAddress, grpcDialOption) } -func WithFilerClient(filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithGrpcFilerClient(filer, grpcDialOption, fn) + return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn) } -func WithGrpcFilerClient(filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { - return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption) } -func WithOneOfGrpcFilerClients(filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { +func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) { for _, filerAddress := range filerAddresses { - err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerAddress.ToGrpcAddress(), grpcDialOption) diff --git a/weed/remote_storage/track_sync_offset.go b/weed/remote_storage/track_sync_offset.go index 439491d12..25ac0d340 100644 --- a/weed/remote_storage/track_sync_offset.go +++ b/weed/remote_storage/track_sync_offset.go @@ -17,7 +17,7 @@ func GetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s dirHash := uint32(util.HashStringToLong(dir)) - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(SyncKeyPrefix + "____") util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) @@ -46,7 +46,7 @@ func SetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s dirHash := uint32(util.HashStringToLong(dir)) - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(SyncKeyPrefix + "____") util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 3e100497f..eaab2c13e 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -83,7 +83,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p } func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) { - if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil { return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err) } else { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 4c536b71c..825d2af95 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -70,7 +70,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) var host string var auth security.EncodedJwt - if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -131,9 +131,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) var _ = filer_pb.FilerClient(&FilerSink{}) -func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index d42ca63a8..c48ab2368 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -100,7 +100,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(key).DirAndName() @@ -156,7 +156,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // read existing entry var existingEntry *filer_pb.Entry - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, @@ -211,7 +211,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // save updated meta data - return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: newParentPath, diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 60c33463f..4108f3821 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -56,7 +56,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) vid := volumeId(part) - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, @@ -118,9 +118,9 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea var _ = filer_pb.FilerClient(&FilerSource{}) -func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.grpcAddress, fs.grpcDialOption) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 0d46ad7ca..87d478136 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -84,7 +84,7 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) { var content []byte - err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile) return err }) diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 888003e45..d227c609e 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -41,7 +41,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err != nil { diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go index 75d3b37d0..e45230165 100644 --- a/weed/s3api/filer_util_tags.go +++ b/weed/s3api/filer_util_tags.go @@ -13,7 +13,7 @@ const ( func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) { - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -35,7 +35,7 @@ func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (t func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, @@ -71,7 +71,7 @@ func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, ta func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) { - return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parentDirectoryPath, diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 247e33104..c3e50ec44 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -83,7 +83,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) // avoid duplicated buckets errCode := s3err.ErrNone - if err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{ IncludeEcVolumes: true, IncludeNormalVolumes: true, @@ -146,7 +146,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque return } - err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete collection deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index e42fb6c44..4ace4bb21 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -13,9 +13,9 @@ import ( var _ = filer_pb.FilerClient(&S3ApiServer{}) -func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 2ac9c8102..d78332395 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -233,7 +233,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete file entries for _, object := range deleteObjects.Objects { diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 4decb5eac..a3b858dcb 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -146,7 +146,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m var nextMarker string // check filer - err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a0385f487..8e6cd8451 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -327,7 +327,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect glog.V(4).Infof("CollectionList %v", req) resp = &filer_pb.CollectionListResponse{} - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: req.IncludeNormalVolumes, IncludeEcVolumes: req.IncludeEcVolumes, @@ -348,7 +348,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet glog.V(4).Infof("DeleteCollection %v", req) - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), }) @@ -362,7 +362,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR var output *master_pb.StatisticsResponse - err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ Replication: req.Replication, Collection: req.Collection, diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 7f31d8cc1..3be986023 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -123,7 +123,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req // tell filer to tell volume server to download into needles assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) - err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ VolumeId: uint32(fileId.VolumeId), NeedleId: uint64(fileId.Key), diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c9343a9bf..1a5f80369 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -165,7 +165,7 @@ func (fs *FilerServer) checkWithMaster() { isConnected := false for !isConnected { for _, master := range fs.option.Masters { - readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index 55f3faf8c..654da6b3c 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) @@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 41a2b570b..72d4e20d7 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -27,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 2659307fc..f3f99ee7b 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -26,7 +26,7 @@ func (vs *VolumeServer) GetMaster() pb.ServerAddress { func (vs *VolumeServer) checkWithMaster() (err error) { for { for _, master := range vs.SeedMasterNodes { - err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 9d9f756ce..52181a771 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -45,7 +45,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ @@ -226,7 +226,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s if receiveErr == io.EOF { break } - if resp!=nil && resp.ModifiedTsNs != 0 { + if resp != nil && resp.ModifiedTsNs != 0 { modifiedTsNs = resp.ModifiedTsNs } if receiveErr != nil { diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 861d352d7..79611f499 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -126,7 +126,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 6c1de3154..018daed8b 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -120,9 +120,9 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { var _ = filer_pb.FilerClient(&WebDavFileSystem{}) -func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) @@ -162,7 +162,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -212,7 +212,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f } dir, name := util.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -315,7 +315,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) oldDir, oldBaseName := util.FullPath(oldName).DirAndName() newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -375,7 +375,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 var fileId, host string var auth security.EncodedJwt - if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -477,7 +477,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Chunks = manifestedChunks } - flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() f.entry.Attributes.Collection = f.collection f.entry.Attributes.Replication = f.replication diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 5ed1677c8..2d391de1d 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -36,7 +36,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W return nil } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 8942c15da..55c8ddf19 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -53,7 +53,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ return nil } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: *collectionName, }) diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index ba502a6b9..b2aa6d2f4 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -62,7 +62,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { var resp *master_pb.CollectionListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: includeNormalVolumes, IncludeEcVolumes: includeEcVolumes, diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 51c8c32cd..765c0ad89 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -61,7 +61,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) targetAddress := pb.NewServerAddressFromDataNode(targetServer.info) - err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetAddress != existingLocation { @@ -241,7 +241,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -256,7 +256,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, @@ -269,7 +269,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index b2ca605c7..288fa4945 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -116,7 +116,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -149,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -187,7 +187,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) @@ -223,7 +223,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) { var resp *master_pb.LookupVolumeResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) return err }) @@ -236,7 +236,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6add14749..fcdee264e 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -126,7 +126,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 409ec4329..f5d1166d2 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -172,7 +172,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -213,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index a5731240d..17e9c6550 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -41,7 +41,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write dir, name := util.FullPath(path).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index ece805db3..73bb8e5c6 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -117,7 +117,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes()) }); err != nil && err != filer_pb.ErrNotFound { return err diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index e0525defa..a7de6d3ef 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W dir, name := util.FullPath(path).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 46dc07e9a..7fe4cf809 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -48,7 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go index 11b663eec..71a35cb68 100644 --- a/weed/shell/command_fs_mkdir.go +++ b/weed/shell/command_fs_mkdir.go @@ -36,7 +36,7 @@ func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Wri dir, name := util.FullPath(path).DirAndName() - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ Directory: dir, diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 2448c8f61..612cdc84e 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -54,7 +54,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer destinationDir, destinationName := util.FullPath(destinationPath).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go index b383366ca..2ce3275bb 100644 --- a/weed/shell/command_fs_rm.go +++ b/weed/shell/command_fs_rm.go @@ -56,7 +56,7 @@ func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer return fmt.Errorf("need to have arguments") } - commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { for _, entry := range entiries { targetPath, err := commandEnv.parseUrl(entry) if err != nil { diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index 3fa905237..c892c3443 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -184,7 +184,7 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error { - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: filer.DirectoryEtcRemote, @@ -214,7 +214,7 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write return err } - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data) }); err != nil && err != filer_pb.ErrNotFound { return err diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index 277c4c2be..e09a66761 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -117,7 +117,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache) - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 2b57db707..019bc93c9 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -117,7 +117,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error { func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error { // find existing directory, and ensure the directory is empty - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { parent, name := util.FullPath(dir).DirAndName() _, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go index 53d043ebf..a3433621e 100644 --- a/weed/shell/command_remote_uncache.go +++ b/weed/shell/command_remote_uncache.go @@ -105,7 +105,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name)) - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: string(dir), Entry: entry, diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go index c947a19e6..49b07f8f1 100644 --- a/weed/shell/command_remote_unmount.go +++ b/weed/shell/command_remote_unmount.go @@ -83,7 +83,7 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error { // find existing directory, and ensure the directory is empty - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { parent, name := util.FullPath(dir).DirAndName() lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go index a512ffc4a..35ecae4ee 100644 --- a/weed/shell/command_s3_bucket_create.go +++ b/weed/shell/command_s3_bucket_create.go @@ -45,7 +45,7 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("empty bucket name") } - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index 26953c249..5f8585b0b 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -52,7 +52,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer } // delete the collection directly first - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: *bucketName, }) diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go index 0c4e8d18f..a344a79a4 100644 --- a/weed/shell/command_s3_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -65,7 +65,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i } func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index 5eab2ebd0..cefb1deeb 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -48,7 +48,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io } var buf bytes.Buffer - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf) }); err != nil && err != filer_pb.ErrNotFound { return err @@ -171,7 +171,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) }); err != nil { return err diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 643cccac3..cf3bc604a 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -182,7 +182,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -200,7 +200,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.Serv func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -229,7 +229,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { - return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 27cba618b..5c4c10146 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -86,7 +86,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } for _, dst := range allLocations { - err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ VolumeId: uint32(vid), Replication: replicaPlacement.String(), diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 2885ba11f..43bf4d0f8 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -265,7 +265,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co break } - err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: replica.info.Id, SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index a7a981339..7a7cdee56 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -248,7 +248,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) } - return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" if vinfo.isEcVolume { diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 575051ffe..7b03b8dfa 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -55,7 +55,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io } func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 796f74264..d7cc8b8ce 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -112,7 +112,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl return } - clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ VolumeId: uint32(volumeId), }) @@ -123,7 +123,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl } }() - err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ VolumeId: uint32(volumeId), }) @@ -140,7 +140,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl return } - err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: string(sourceVolumeServer), @@ -173,7 +173,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { - return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, @@ -186,7 +186,7 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source } func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), }) @@ -195,7 +195,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour } func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if writable { _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ VolumeId: uint32(volumeId), diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go index 4daa589be..029ef2201 100644 --- a/weed/shell/command_volume_server_leave.go +++ b/weed/shell/command_volume_server_leave.go @@ -56,7 +56,7 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri } func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) { - return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{}) if leaveErr != nil { fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 57d3bf347..a2330ab8a 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -124,7 +124,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error { - err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index a22fe92a1..0df0790e6 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -118,7 +118,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index d5cb9f07c..85bec44f7 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -55,7 +55,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer } func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index 2e09a8c1b..a09bf5d56 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -39,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ return } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ GarbageThreshold: float32(*garbageThreshold), }) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 02c0af59e..985f6423b 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -97,9 +97,9 @@ func (ce *CommandEnv) checkDirectory(path string) error { var _ = filer_pb.FilerClient(&CommandEnv{}) -func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn) + return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn) } diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index caf8da859..0aa65f049 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -53,7 +53,7 @@ func RunShell(options ShellOptions) { if commandEnv.option.FilerAddress == "" { var filers []pb.ServerAddress - commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) @@ -75,7 +75,7 @@ func RunShell(options ShellOptions) { } if commandEnv.option.FilerAddress != "" { - commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 0e33a8e32..70e1593a0 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -239,7 +239,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId) - err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(false, s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } @@ -287,7 +287,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { - err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index d40165ff5..398771a83 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -88,7 +88,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { - return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{ diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index a8ec50cad..67406aff6 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -73,7 +73,7 @@ func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption writeOffset := int64(startFromOffset) - err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{ VolumeId: uint32(v.Id), diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 83043c23f..e1ae30c2d 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -15,7 +15,7 @@ type AllocateVolumeResult struct { func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { - return operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ VolumeId: uint32(vid), diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 40e3f70e7..74d70bcdb 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -22,7 +22,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne errCount := int32(0) for index, dn := range locationlist.list { go func(index int, url pb.ServerAddress, vid needle.VolumeId) { - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), }) @@ -70,7 +70,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * for index, dn := range locationlist.list { go func(index int, url pb.ServerAddress, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: uint32(vid), Preallocate: preallocate, @@ -121,7 +121,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V isReadOnly := false for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) @@ -147,7 +147,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: uint32(vid), }) diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 725fa307d..1767ee4a4 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -54,7 +54,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { // retry to get the lease for { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), @@ -82,7 +82,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { defer cancel2() for l.isLocking { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), @@ -114,7 +114,7 @@ func (l *ExclusiveLocker) ReleaseLock() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 727d9cd34..672b3ac49 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -59,7 +59,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres if master == myMasterAddress { continue } - if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) defer cancel() resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) @@ -96,7 +96,7 @@ func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master) - gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -172,12 +172,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL return } -func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error { +func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { for mc.currentMaster == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) }) |
