aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/cluster/master_client.go3
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/upload.go7
-rw-r--r--weed/mount/wfs_filer_client.go2
-rw-r--r--weed/mq/broker/broker_server.go5
-rw-r--r--weed/operation/grpc_client.go4
-rw-r--r--weed/pb/grpc_client_server.go37
-rw-r--r--weed/replication/sink/filersink/fetch_write.go5
-rw-r--r--weed/replication/source/filer_source.go2
-rw-r--r--weed/s3api/s3api_handlers.go5
-rw-r--r--weed/server/filer_grpc_server_admin.go5
-rw-r--r--weed/server/master_grpc_server_admin.go9
-rw-r--r--weed/server/master_server.go33
-rw-r--r--weed/server/volume_grpc_admin.go7
-rw-r--r--weed/server/volume_grpc_client_to_master.go2
-rw-r--r--weed/server/volume_grpc_copy.go7
-rw-r--r--weed/server/webdav_server.go2
-rw-r--r--weed/shell/command_cluster_check.go7
-rw-r--r--weed/shell/command_collection_delete.go3
-rw-r--r--weed/wdclient/masterclient.go11
20 files changed, 99 insertions, 59 deletions
diff --git a/weed/cluster/master_client.go b/weed/cluster/master_client.go
index d1e769c1d..15009e132 100644
--- a/weed/cluster/master_client.go
+++ b/weed/cluster/master_client.go
@@ -2,6 +2,7 @@ package cluster
import (
"context"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -10,7 +11,7 @@ import (
func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) {
- if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: clientType,
FilerGroup: filerGroup,
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 977e2a6b8..db2d8c42c 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -572,7 +572,7 @@ func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, filerGrpcAddress, worker.options.grpcDialOption)
+ }, filerGrpcAddress, false, worker.options.grpcDialOption)
return
}
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 389a72552..1f03f7b5a 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -4,11 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
+ "os"
+ "path/filepath"
+
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"google.golang.org/grpc"
- "os"
- "path/filepath"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
@@ -130,7 +131,7 @@ func runUpload(cmd *Command, args []string) bool {
}
func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) {
- err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, masterAddress, grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go
index 020970df7..e991d8b39 100644
--- a/weed/mount/wfs_filer_client.go
+++ b/weed/mount/wfs_filer_client.go
@@ -25,7 +25,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, filerGrpcAddress, wfs.option.GrpcDialOption)
+ }, filerGrpcAddress, false, wfs.option.GrpcDialOption)
if err != nil {
glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index c107dbe45..89afb6e4d 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,11 +1,12 @@
package broker
import (
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
- "time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -100,7 +101,7 @@ func (broker *MessageQueueBroker) GetDataCenter() string {
func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
- return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index c06e501a5..c1f2bba82 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -13,7 +13,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, g
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, volumeServer.ToGrpcAddress(), grpcDialOption)
+ }, volumeServer.ToGrpcAddress(), false, grpcDialOption)
}
@@ -22,6 +22,6 @@ func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, g
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterServer.ToGrpcAddress(), grpcDialOption)
+ }, masterServer.ToGrpcAddress(), false, grpcDialOption)
}
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index a78ed0ca4..f3cca7fba 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -3,9 +3,6 @@ package pb
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
"math/rand"
"net/http"
"strconv"
@@ -13,6 +10,10 @@ import (
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@@ -65,15 +66,17 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
return grpc.NewServer(options...)
}
-func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// opts = append(opts, grpc.WithBlock())
// opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
var options []grpc.DialOption
+
options = append(options,
// grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(Max_Message_Size),
grpc.MaxCallRecvMsgSize(Max_Message_Size),
+ grpc.WaitForReady(waitForReady),
),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // client ping server if no activity for this long
@@ -88,7 +91,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
return grpc.DialContext(ctx, address, options...)
}
-func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) {
+func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialOption) (*versionedGrpcClient, error) {
grpcClientsLock.Lock()
defer grpcClientsLock.Unlock()
@@ -99,7 +102,7 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG
}
ctx := context.Background()
- grpcConnection, err := GrpcDial(ctx, address, opts...)
+ grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...)
if err != nil {
return nil, fmt.Errorf("fail to dial %s: %v", address, err)
}
@@ -115,10 +118,10 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG
}
// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
-func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
+func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error {
if !streamingMode {
- vgc, err := getOrCreateConnection(address, opts...)
+ vgc, err := getOrCreateConnection(address, waitForReady, opts...)
if err != nil {
return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
}
@@ -138,7 +141,7 @@ func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address
}
return executionErr
} else {
- grpcConnection, err := GrpcDial(context.Background(), address, opts...)
+ grpcConnection, err := GrpcDial(context.Background(), address, waitForReady, opts...)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", address, err)
}
@@ -200,11 +203,11 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
return util.JoinHostPort(host, port)
}
-func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
+func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, master.ToGrpcAddress(), grpcDialOption)
+ }, master.ToGrpcAddress(), waitForReady, grpcDialOption)
}
@@ -212,7 +215,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, volumeServer.ToGrpcAddress(), grpcDialOption)
+ }, volumeServer.ToGrpcAddress(), false, grpcDialOption)
}
@@ -220,7 +223,7 @@ func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption g
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
- }, broker.ToGrpcAddress(), grpcDialOption)
+ }, broker.ToGrpcAddress(), false, grpcDialOption)
}
@@ -230,7 +233,7 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri
err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption)
+ }, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption)
if err == nil {
return nil
}
@@ -244,7 +247,7 @@ func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDial
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
- }, brokerGrpcAddress, grpcDialOption)
+ }, brokerGrpcAddress, false, grpcDialOption)
}
@@ -259,7 +262,7 @@ func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grp
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption)
+ }, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption)
}
@@ -269,7 +272,7 @@ func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddres
err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, filerAddress.ToGrpcAddress(), grpcDialOption)
+ }, filerAddress.ToGrpcAddress(), false, grpcDialOption)
if err == nil {
return nil
}
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d1a5d7ebd..c0321039b 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -2,9 +2,10 @@ package filersink
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util"
"sync"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -113,7 +114,7 @@ func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.Seawee
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, fs.grpcAddress, fs.grpcDialOption)
+ }, fs.grpcAddress, false, fs.grpcDialOption)
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 6c69b735c..2da883ba6 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -131,7 +131,7 @@ func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.Seaw
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, fs.grpcAddress, fs.grpcDialOption)
+ }, fs.grpcAddress, false, fs.grpcDialOption)
}
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 4e1dd09a7..b85ff485d 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -3,9 +3,10 @@ package s3api
import (
"encoding/base64"
"fmt"
+ "net/http"
+
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc"
- "net/http"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -18,7 +19,7 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption)
+ }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
}
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
index 50c52c650..32cb2830d 100644
--- a/weed/server/filer_grpc_server_admin.go
+++ b/weed/server/filer_grpc_server_admin.go
@@ -3,6 +3,8 @@ package weed_server
import (
"context"
"fmt"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -10,7 +12,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "time"
)
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
@@ -66,7 +67,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res
})
}
if req.TargetType == cluster.MasterType {
- pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
index 1dd89ad60..fb2c5bd50 100644
--- a/weed/server/master_grpc_server_admin.go
+++ b/weed/server/master_grpc_server_admin.go
@@ -3,15 +3,16 @@ package weed_server
import (
"context"
"fmt"
+ "math/rand"
+ "sync"
+ "time"
+
"github.com/seaweedfs/raft"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "math/rand"
- "sync"
- "time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@@ -175,7 +176,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r
})
}
if req.TargetType == cluster.MasterType {
- pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index fbc27e610..9adcafc6f 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,8 +1,8 @@
package weed_server
import (
+ "context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/stats"
"net/http"
"net/http/httputil"
"net/url"
@@ -12,6 +12,8 @@ import (
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -242,7 +244,6 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
}
func (ms *MasterServer) startAdminScripts() {
-
v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
if adminScripts == "" {
@@ -342,8 +343,10 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
- isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
- if update.IsAdd && isLeader {
+ if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader {
+ return
+ }
+ if update.IsAdd {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
@@ -356,5 +359,27 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
+ } else {
+ pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*72)
+ defer cancel()
+ if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil {
+ glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName)
+ if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ }); err != nil {
+ glog.Warningf("failed removing old raft server: %v", err)
+ return err
+ }
+ } else {
+ glog.V(0).Infof("master %s successfully responded to ping", peerName)
+ }
+
+ return nil
+ })
}
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index c570ae2df..aace63fd8 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,13 +3,14 @@ package weed_server
import (
"context"
"fmt"
+ "path/filepath"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "path/filepath"
- "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@@ -280,7 +281,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ
})
}
if req.TargetType == cluster.MasterType {
- pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index c00524577..e55d821a8 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -94,7 +94,7 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption)
+ grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index d9928ed18..be152d246 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,13 +3,14 @@ package weed_server
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/backend"
"io"
"math"
"os"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/backend"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -81,7 +82,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}()
var preallocateSize int64
- if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index eaa373dd0..b874ee9a2 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -127,7 +127,7 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
+ }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
}
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go
index fef371b49..2cabf91b8 100644
--- a/weed/shell/command_cluster_check.go
+++ b/weed/shell/command_cluster_check.go
@@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "io"
)
func init() {
@@ -97,7 +98,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, master := range masters {
for _, volumeServer := range volumeServers {
fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
- err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error {
pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: string(volumeServer),
TargetType: cluster.VolumeServerType,
@@ -120,7 +121,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
continue
}
fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
- err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error {
pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: string(targetMaster),
TargetType: cluster.MasterType,
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index 78a5b7157..936f35b46 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -4,8 +4,9 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"io"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
func init() {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 391dd9199..2583bda80 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -3,10 +3,11 @@ package wdclient
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/stats"
"math/rand"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
@@ -52,7 +53,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
@@ -114,7 +115,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
if master == myMasterAddress {
continue
}
- if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
@@ -150,7 +151,7 @@ func (mc *MasterClient) tryAllMasters() {
func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
- gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -271,7 +272,7 @@ func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.
for mc.currentMaster == "" {
time.Sleep(3 * time.Second)
}
- return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})