diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
| commit | 9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch) | |
| tree | 1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/command | |
| parent | c935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff) | |
| download | seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip | |
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_cat.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 14 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway.go | 6 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 4 | ||||
| -rw-r--r-- | weed/command/iam.go | 2 | ||||
| -rw-r--r-- | weed/command/master_follower.go | 2 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 2 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 2 | ||||
| -rw-r--r-- | weed/command/s3.go | 2 | ||||
| -rw-r--r-- | weed/command/upload.go | 2 | ||||
| -rw-r--r-- | weed/command/webdav.go | 2 |
14 files changed, 25 insertions, 25 deletions
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 71c3a48d6..7f613f72b 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -97,7 +97,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 8a8701828..88ae55e84 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -172,7 +172,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -302,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -344,7 +344,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err err = util.Retry("upload", func() error { // assign a volume - assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -404,7 +404,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -461,7 +461,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -540,7 +540,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -598,7 +598,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 0b8fa76c6..d52ed3349 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -222,9 +222,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) -func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 9426f3841..fa0239558 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -32,8 +32,8 @@ type RemoteGatewayOptions struct { var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) -func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } @@ -87,7 +87,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool { remoteGatewayOptions.bucketsDir = "/buckets" // check buckets again - remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index bceeb097e..681ea35e9 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -22,8 +22,8 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) -func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 30782942e..947f526bb 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -227,7 +227,7 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool { func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() entry.RemoteEntry = remoteEntry - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: dir, Entry: entry, diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 20755dbe5..230b24a52 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -183,7 +183,7 @@ const ( func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -209,7 +209,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) diff --git a/weed/command/iam.go b/weed/command/iam.go index ebe9657f2..8fb14be06 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -48,7 +48,7 @@ func (iamopt *IamOptions) startIamServer() bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index 95f1c80b8..6d7aa2848 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -87,7 +87,7 @@ func startMasterFollower(masterOptions MasterOptions) { var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master grpc address %v configuration: %v", masters, err) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2603260a2..ce9a998f6 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -78,7 +78,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { var cipher bool var err error for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 61517ab39..35d59ea20 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -68,7 +68,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { cipher := false for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/s3.go b/weed/command/s3.go index d7cd7818d..ee726fcec 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -153,7 +153,7 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/upload.go b/weed/command/upload.go index f46e70cb1..f2b0b7fe4 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -130,7 +130,7 @@ func runUpload(cmd *Command, args []string) bool { } func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { - err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index bf4609d63..319302175 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -85,7 +85,7 @@ func (wo *WebDavOption) startWebDav() bool { var cipher bool // connect to filer for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) |
