aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-07 01:25:01 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-07 01:25:01 -0800
commit29f1673d9766f11b256ca1c0d653aaa7d99e13aa (patch)
tree72c3967aa5338519effcd08c063e8d6f598fcdaa
parent6946c51430473739ae4819c30458ff7edd107bdd (diff)
downloadseaweedfs-29f1673d9766f11b256ca1c0d653aaa7d99e13aa.tar.xz
seaweedfs-29f1673d9766f11b256ca1c0d653aaa7d99e13aa.zip
refactoring
-rw-r--r--weed/filesys/wfs.go27
-rw-r--r--weed/operation/grpc_client.go44
-rw-r--r--weed/util/grpc_client_server.go37
3 files changed, 46 insertions, 62 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index e1f369e5b..bb655c256 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -40,10 +40,6 @@ type WFS struct {
pathToHandleIndex map[string]int
pathToHandleLock sync.Mutex
- // cache grpc connections
- grpcClients map[string]*grpc.ClientConn
- grpcClientsLock sync.Mutex
-
stats statsCache
}
type statsCache struct {
@@ -56,7 +52,6 @@ func NewSeaweedFileSystem(option *Option) *WFS {
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(int64(option.DirListingLimit) + 200).ItemsToPrune(100)),
pathToHandleIndex: make(map[string]int),
- grpcClients: make(map[string]*grpc.ClientConn),
}
}
@@ -66,27 +61,11 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- wfs.grpcClientsLock.Lock()
-
- existingConnection, found := wfs.grpcClients[wfs.option.FilerGrpcAddress]
- if found {
- wfs.grpcClientsLock.Unlock()
- client := filer_pb.NewSeaweedFilerClient(existingConnection)
+ return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }
-
- grpcConnection, err := util.GrpcDial(wfs.option.FilerGrpcAddress)
- if err != nil {
- wfs.grpcClientsLock.Unlock()
- return fmt.Errorf("fail to dial %s: %v", wfs.option.FilerGrpcAddress, err)
- }
-
- wfs.grpcClients[wfs.option.FilerGrpcAddress] = grpcConnection
- wfs.grpcClientsLock.Unlock()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ }, wfs.option.FilerGrpcAddress)
- return fn(client)
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index b1d6a633e..300f78b58 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -25,27 +25,11 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume
return err
}
- grpcClientsLock.Lock()
-
- existingConnection, found := grpcClients[grpcAddress]
- if found {
- grpcClientsLock.Unlock()
- client := volume_server_pb.NewVolumeServerClient(existingConnection)
+ return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }
-
- grpcConnection, err := util.GrpcDial(grpcAddress)
- if err != nil {
- grpcClientsLock.Unlock()
- return fmt.Errorf("fail to dial %s: %v", grpcAddress, err)
- }
-
- grpcClients[grpcAddress] = grpcConnection
- grpcClientsLock.Unlock()
-
- client := volume_server_pb.NewVolumeServerClient(grpcConnection)
+ }, grpcAddress)
- return fn(client)
}
func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) {
@@ -60,25 +44,9 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
- grpcClientsLock.Lock()
-
- existingConnection, found := grpcClients[masterServer]
- if found {
- grpcClientsLock.Unlock()
- client := master_pb.NewSeaweedClient(existingConnection)
+ return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }
-
- grpcConnection, err := util.GrpcDial(masterServer)
- if err != nil {
- grpcClientsLock.Unlock()
- return fmt.Errorf("fail to dial %s: %v", masterServer, err)
- }
-
- grpcClients[masterServer] = grpcConnection
- grpcClientsLock.Unlock()
-
- client := master_pb.NewSeaweedClient(grpcConnection)
+ }, masterServer)
- return fn(client)
}
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
index 8dbb4c0cd..18d5c02c9 100644
--- a/weed/util/grpc_client_server.go
+++ b/weed/util/grpc_client_server.go
@@ -1,12 +1,20 @@
package util
import (
+ "fmt"
+ "sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
+var (
+ // cache grpc connections
+ grpcClients = make(map[string]*grpc.ClientConn)
+ grpcClientsLock sync.Mutex
+)
+
func NewGrpcServer() *grpc.Server {
return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Second, // wait time before ping if no activity
@@ -26,3 +34,32 @@ func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
return grpc.Dial(address, opts...)
}
+
+func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
+
+ grpcClientsLock.Lock()
+
+ existingConnection, found := grpcClients[address]
+ if found {
+ grpcClientsLock.Unlock()
+ return fn(existingConnection)
+ }
+
+ grpcConnection, err := GrpcDial(address, opts...)
+ if err != nil {
+ grpcClientsLock.Unlock()
+ return fmt.Errorf("fail to dial %s: %v", address, err)
+ }
+
+ grpcClients[address] = grpcConnection
+ grpcClientsLock.Unlock()
+
+ err = fn(grpcConnection)
+ if err != nil {
+ grpcClientsLock.Lock()
+ delete(grpcClients, address)
+ grpcClientsLock.Unlock()
+ }
+
+ return err
+}