aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer.go3
-rw-r--r--weed/command/filer_copy.go3
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/server.go3
-rw-r--r--weed/filer2/filer_master.go4
-rw-r--r--weed/filesys/wfs.go4
-rw-r--r--weed/server/volume_grpc_client.go4
-rw-r--r--weed/util/grpc_client_server.go28
8 files changed, 38 insertions, 14 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 2d4696828..e5a3e379a 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"strings"
)
@@ -129,7 +128,7 @@ func (fo *FilerOptions) start() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := grpc.NewServer()
+ grpcS := util.NewGrpcServer()
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 3d4e5db9f..9937bc9d6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -16,7 +16,6 @@ import (
"strconv"
"io"
"time"
- "google.golang.org/grpc"
"context"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -340,7 +339,7 @@ func detectMimeType(f *os.File) string {
func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure())
+ grpcConnection, err := util.GrpcDial(filerAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
}
diff --git a/weed/command/master.go b/weed/command/master.go
index 8abec2a3d..c1b9cf5ae 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -14,7 +14,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
- "google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@@ -103,7 +102,7 @@ func runMaster(cmd *Command, args []string) bool {
httpL := m.Match(cmux.Any())
// Create your protocol servers.
- grpcS := grpc.NewServer()
+ grpcS := util.NewGrpcServer()
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
diff --git a/weed/command/server.go b/weed/command/server.go
index 606845199..485dea7ac 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -17,7 +17,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
- "google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@@ -208,7 +207,7 @@ func runServer(cmd *Command, args []string) bool {
httpL := m.Match(cmux.Any())
// Create your protocol servers.
- grpcS := grpc.NewServer()
+ grpcS := util.NewGrpcServer()
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go
index f69f68a85..51b12c237 100644
--- a/weed/filer2/filer_master.go
+++ b/weed/filer2/filer_master.go
@@ -7,7 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/glog"
- "google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *Filer) GetMaster() string {
@@ -48,7 +48,7 @@ func (fs *Filer) KeepConnectedToMaster() {
func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
- grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
+ grpcConnection, err := util.GrpcDial(master)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", master, err)
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index a126bf3ea..d7e133483 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -5,10 +5,10 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/karlseguin/ccache"
- "google.golang.org/grpc"
"sync"
"bazil.org/fuse"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type WFS struct {
@@ -43,7 +43,7 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := grpc.Dial(wfs.filerGrpcAddress, grpc.WithInsecure())
+ grpcConnection, err := util.GrpcDial(wfs.filerGrpcAddress)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
}
diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go
index 7688745e2..b3c755239 100644
--- a/weed/server/volume_grpc_client.go
+++ b/weed/server/volume_grpc_client.go
@@ -8,7 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"golang.org/x/net/context"
- "google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) GetMaster() string {
@@ -38,7 +38,7 @@ func (vs *VolumeServer) heartbeat() {
func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure())
+ grpcConection, err := util.GrpcDial(masterNode)
if err != nil {
return "", fmt.Errorf("fail to dial: %v", err)
}
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
new file mode 100644
index 000000000..8dbb4c0cd
--- /dev/null
+++ b/weed/util/grpc_client_server.go
@@ -0,0 +1,28 @@
+package util
+
+import (
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/keepalive"
+)
+
+func NewGrpcServer() *grpc.Server {
+ return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
+ Time: 10 * time.Second, // wait time before ping if no activity
+ Timeout: 20 * time.Second, // ping timeout
+ }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+ MinTime: 60 * time.Second, // min time a client should wait before sending a ping
+ }))
+}
+
+func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+
+ opts = append(opts, grpc.WithInsecure())
+ opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
+ Time: 30 * time.Second, // client ping server if no activity for this long
+ Timeout: 20 * time.Second,
+ }))
+
+ return grpc.Dial(address, opts...)
+}