diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_cat.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 14 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway.go | 6 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 4 | ||||
| -rw-r--r-- | weed/command/iam.go | 2 | ||||
| -rw-r--r-- | weed/command/master_follower.go | 2 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 2 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 2 | ||||
| -rw-r--r-- | weed/command/s3.go | 2 | ||||
| -rw-r--r-- | weed/command/upload.go | 2 | ||||
| -rw-r--r-- | weed/command/webdav.go | 2 |
14 files changed, 25 insertions, 25 deletions
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 71c3a48d6..7f613f72b 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -97,7 +97,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 8a8701828..88ae55e84 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -172,7 +172,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -302,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -344,7 +344,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err err = util.Retry("upload", func() error { // assign a volume - assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -404,7 +404,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -461,7 +461,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -540,7 +540,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -598,7 +598,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 0b8fa76c6..d52ed3349 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -222,9 +222,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) -func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 9426f3841..fa0239558 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -32,8 +32,8 @@ type RemoteGatewayOptions struct { var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) -func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } @@ -87,7 +87,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool { remoteGatewayOptions.bucketsDir = "/buckets" // check buckets again - remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index bceeb097e..681ea35e9 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -22,8 +22,8 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) -func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 30782942e..947f526bb 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -227,7 +227,7 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool { func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() entry.RemoteEntry = remoteEntry - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: dir, Entry: entry, diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 20755dbe5..230b24a52 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -183,7 +183,7 @@ const ( func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -209,7 +209,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) diff --git a/weed/command/iam.go b/weed/command/iam.go index ebe9657f2..8fb14be06 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -48,7 +48,7 @@ func (iamopt *IamOptions) startIamServer() bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index 95f1c80b8..6d7aa2848 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -87,7 +87,7 @@ func startMasterFollower(masterOptions MasterOptions) { var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master grpc address %v configuration: %v", masters, err) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2603260a2..ce9a998f6 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -78,7 +78,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { var cipher bool var err error for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 61517ab39..35d59ea20 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -68,7 +68,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { cipher := false for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/s3.go b/weed/command/s3.go index d7cd7818d..ee726fcec 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -153,7 +153,7 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/upload.go b/weed/command/upload.go index f46e70cb1..f2b0b7fe4 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -130,7 +130,7 @@ func runUpload(cmd *Command, args []string) bool { } func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { - err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index bf4609d63..319302175 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -85,7 +85,7 @@ func (wo *WebDavOption) startWebDav() bool { var cipher bool // connect to filer for { - err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) |
