aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_admin.go5
-rw-r--r--weed/server/master_grpc_server_admin.go9
-rw-r--r--weed/server/master_server.go33
-rw-r--r--weed/server/volume_grpc_admin.go7
-rw-r--r--weed/server/volume_grpc_client_to_master.go2
-rw-r--r--weed/server/volume_grpc_copy.go7
-rw-r--r--weed/server/webdav_server.go2
7 files changed, 47 insertions, 18 deletions
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
index 50c52c650..32cb2830d 100644
--- a/weed/server/filer_grpc_server_admin.go
+++ b/weed/server/filer_grpc_server_admin.go
@@ -3,6 +3,8 @@ package weed_server
import (
"context"
"fmt"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -10,7 +12,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "time"
)
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
@@ -66,7 +67,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res
})
}
if req.TargetType == cluster.MasterType {
- pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
index 1dd89ad60..fb2c5bd50 100644
--- a/weed/server/master_grpc_server_admin.go
+++ b/weed/server/master_grpc_server_admin.go
@@ -3,15 +3,16 @@ package weed_server
import (
"context"
"fmt"
+ "math/rand"
+ "sync"
+ "time"
+
"github.com/seaweedfs/raft"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "math/rand"
- "sync"
- "time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@@ -175,7 +176,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r
})
}
if req.TargetType == cluster.MasterType {
- pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index fbc27e610..9adcafc6f 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,8 +1,8 @@
package weed_server
import (
+ "context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/stats"
"net/http"
"net/http/httputil"
"net/url"
@@ -12,6 +12,8 @@ import (
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -242,7 +244,6 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
}
func (ms *MasterServer) startAdminScripts() {
-
v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
if adminScripts == "" {
@@ -342,8 +343,10 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
- isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
- if update.IsAdd && isLeader {
+ if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader {
+ return
+ }
+ if update.IsAdd {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
@@ -356,5 +359,27 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
+ } else {
+ pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*72)
+ defer cancel()
+ if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil {
+ glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName)
+ if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ }); err != nil {
+ glog.Warningf("failed removing old raft server: %v", err)
+ return err
+ }
+ } else {
+ glog.V(0).Infof("master %s successfully responded to ping", peerName)
+ }
+
+ return nil
+ })
}
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index c570ae2df..aace63fd8 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,13 +3,14 @@ package weed_server
import (
"context"
"fmt"
+ "path/filepath"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "path/filepath"
- "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@@ -280,7 +281,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ
})
}
if req.TargetType == cluster.MasterType {
- pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index c00524577..e55d821a8 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -94,7 +94,7 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption)
+ grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index d9928ed18..be152d246 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,13 +3,14 @@ package weed_server
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/backend"
"io"
"math"
"os"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/backend"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -81,7 +82,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}()
var preallocateSize int64
- if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index eaa373dd0..b874ee9a2 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -127,7 +127,7 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
+ }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
}
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {