diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_admin.go | 5 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_admin.go | 9 | ||||
| -rw-r--r-- | weed/server/master_server.go | 33 | ||||
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 7 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
7 files changed, 47 insertions, 18 deletions
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 50c52c650..32cb2830d 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -3,6 +3,8 @@ package weed_server import ( "context" "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -10,7 +12,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "time" ) func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { @@ -66,7 +67,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 1dd89ad60..fb2c5bd50 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -3,15 +3,16 @@ package weed_server import ( "context" "fmt" + "math/rand" + "sync" + "time" + "github.com/seaweedfs/raft" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "math/rand" - "sync" - "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -175,7 +176,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_server.go b/weed/server/master_server.go index fbc27e610..9adcafc6f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,8 +1,8 @@ package weed_server import ( + "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "net/http" "net/http/httputil" "net/url" @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -242,7 +244,6 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } func (ms *MasterServer) startAdminScripts() { - v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") if adminScripts == "" { @@ -342,8 +343,10 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) - isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd && isLeader { + if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader { + return + } + if update.IsAdd { raftServerFound := false for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { if string(server.ID) == peerName { @@ -356,5 +359,27 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerID(peerName), hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } + } else { + pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*72) + defer cancel() + if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil { + glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName) + if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }); err != nil { + glog.Warningf("failed removing old raft server: %v", err) + return err + } + } else { + glog.V(0).Infof("master %s successfully responded to ping", peerName) + } + + return nil + }) } } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index c570ae2df..aace63fd8 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,13 +3,14 @@ package weed_server import ( "context" "fmt" + "path/filepath" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "path/filepath" - "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -280,7 +281,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index c00524577..e55d821a8 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -94,7 +94,7 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti ctx, cancel := context.WithCancel(context.Background()) defer cancel() - grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption) + grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err) } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index d9928ed18..be152d246 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,13 +3,14 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/backend" "io" "math" "os" "time" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -81,7 +82,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre }() var preallocateSize int64 - if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index eaa373dd0..b874ee9a2 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -127,7 +127,7 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) + }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption) } func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { |
