aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_cat.go2
-rw-r--r--weed/command/filer_copy.go14
-rw-r--r--weed/command/filer_meta_backup.go4
-rw-r--r--weed/command/filer_remote_gateway.go6
-rw-r--r--weed/command/filer_remote_sync.go4
-rw-r--r--weed/command/filer_remote_sync_dir.go2
-rw-r--r--weed/command/filer_sync.go4
-rw-r--r--weed/command/iam.go2
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/command/mount_std.go2
-rw-r--r--weed/command/msg_broker.go2
-rw-r--r--weed/command/s3.go2
-rw-r--r--weed/command/upload.go2
-rw-r--r--weed/command/webdav.go2
14 files changed, 25 insertions, 25 deletions
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index 71c3a48d6..7f613f72b 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -97,7 +97,7 @@ func runFilerCat(cmd *Command, args []string) bool {
writer = f
}
- pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 8a8701828..88ae55e84 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -172,7 +172,7 @@ func runCopy(cmd *Command, args []string) bool {
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@@ -302,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi
return
}
- err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
@@ -344,7 +344,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
err = util.Retry("upload", func() error {
// assign a volume
- assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -404,7 +404,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
- if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -461,7 +461,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
@@ -540,7 +540,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("create manifest: %v", manifestErr)
}
- if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -598,7 +598,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
var fileId, host string
var auth security.EncodedJwt
- if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 0b8fa76c6..d52ed3349 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -222,9 +222,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err
var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
-func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go
index 9426f3841..fa0239558 100644
--- a/weed/command/filer_remote_gateway.go
+++ b/weed/command/filer_remote_gateway.go
@@ -32,8 +32,8 @@ type RemoteGatewayOptions struct {
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
-func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
@@ -87,7 +87,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool {
remoteGatewayOptions.bucketsDir = "/buckets"
// check buckets again
- remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index bceeb097e..681ea35e9 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -22,8 +22,8 @@ type RemoteSyncOptions struct {
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
-func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 30782942e..947f526bb 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -227,7 +227,7 @@ func shouldSendToRemote(entry *filer_pb.Entry) bool {
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
entry.RemoteEntry = remoteEntry
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 20755dbe5..230b24a52 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -183,7 +183,7 @@ const (
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
- readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
@@ -209,7 +209,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
- return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
diff --git a/weed/command/iam.go b/weed/command/iam.go
index ebe9657f2..8fb14be06 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -48,7 +48,7 @@ func (iamopt *IamOptions) startIamServer() bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index 95f1c80b8..6d7aa2848 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -87,7 +87,7 @@ func startMasterFollower(masterOptions MasterOptions) {
var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
for i := 0; i < 10; i++ {
- err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master grpc address %v configuration: %v", masters, err)
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 2603260a2..ce9a998f6 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -78,7 +78,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
var cipher bool
var err error
for i := 0; i < 10; i++ {
- err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 61517ab39..35d59ea20 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -68,7 +68,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
cipher := false
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index d7cd7818d..ee726fcec 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -153,7 +153,7 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index f46e70cb1..f2b0b7fe4 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -130,7 +130,7 @@ func runUpload(cmd *Command, args []string) bool {
}
func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) {
- err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index bf4609d63..319302175 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -85,7 +85,7 @@ func (wo *WebDavOption) startWebDav() bool {
var cipher bool
// connect to filer
for {
- err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)