aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-05-21 01:28:00 -0700
committerChris Lu <chris.lu@gmail.com>2021-05-21 01:28:00 -0700
commitdc1309f084bf9f420629bdf5c2cbe88c07400930 (patch)
treecbdcc53ffd2e8b1ddfe3e62c21c8dc94bb472f28
parent30c67e3652f0a04c77593849004cd6a138451263 (diff)
downloadseaweedfs-dc1309f084bf9f420629bdf5c2cbe88c07400930.tar.xz
seaweedfs-dc1309f084bf9f420629bdf5c2cbe88c07400930.zip
FUSE mount: support multiple filers
fix https://github.com/chrislusf/seaweedfs/issues/2015 fix https://github.com/chrislusf/seaweedfs/issues/1531
-rw-r--r--weed/command/mount_std.go20
-rw-r--r--weed/filesys/wfs.go13
-rw-r--r--weed/filesys/wfs_filer_client.go39
-rw-r--r--weed/filesys/wfs_write.go2
-rw-r--r--weed/pb/grpc_client_server.go25
5 files changed, 72 insertions, 27 deletions
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 2474cf7dd..e72a2f2cf 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -51,9 +51,9 @@ func runMount(cmd *Command, args []string) bool {
func RunMount(option *MountOptions, umask os.FileMode) bool {
- filer := *option.filer
+ filers := strings.Split(*option.filer, ",")
// parse filer grpc address
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer)
+ filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers)
if err != nil {
glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
return true
@@ -64,22 +64,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
for i := 0; i < 10; i++ {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
- glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
- glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err)
return true
}
@@ -145,7 +145,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
options := []fuse.MountOption{
fuse.VolumeName(mountName),
- fuse.FSName(filer + ":" + filerMountRootPath),
+ fuse.FSName(*option.filer + ":" + filerMountRootPath),
fuse.Subtype("seaweedfs"),
// fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
fuse.NoAppleXattr(),
@@ -181,8 +181,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
MountDirectory: dir,
- FilerAddress: filer,
- FilerGrpcAddress: filerGrpcAddress,
+ FilerAddresses: filers,
+ FilerGrpcAddresses: filerGrpcAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
@@ -218,7 +218,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
c.Close()
})
- glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir)
+ glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
server := fs.New(c, nil)
seaweedFileSystem.Server = server
err = server.Serve(seaweedFileSystem)
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 4096d3595..b634420d6 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -28,8 +28,9 @@ import (
type Option struct {
MountDirectory string
- FilerAddress string
- FilerGrpcAddress string
+ FilerAddresses []string
+ filerIndex int
+ FilerGrpcAddresses []string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
@@ -95,7 +96,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
signature: util.RandomInt32(),
}
- cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
+ cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
@@ -259,11 +260,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
- return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
-
+}
+func (wfs *WFS) getCurrentFiler() string {
+ return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
type NodeWithId uint64
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index 671d20ba2..95ebdb9b8 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -1,6 +1,7 @@
package filesys
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
@@ -10,19 +11,35 @@ import (
var _ = filer_pb.FilerClient(&WFS{})
-func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
- err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
- })
+ return util.Retry("filer grpc", func() error {
- if err == nil {
- return nil
- }
- return err
+ i := wfs.option.filerIndex
+ n := len(wfs.option.FilerGrpcAddresses)
+ for x := 0; x < n; x++ {
+
+ filerGrpcAddress := wfs.option.FilerGrpcAddresses[i]
+ err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, wfs.option.GrpcDialOption)
+
+ if err != nil {
+ glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
+ } else {
+ wfs.option.filerIndex = i
+ return nil
+ }
+
+ i++
+ if i >= n {
+ i = 0
+ }
+
+ }
+ return err
+ })
}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 730578202..42c13cfd0 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
- fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 9efcd9bdc..cdac0ba99 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -111,6 +111,16 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts
func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
return ParseServerAddress(server, 10000)
}
+func ParseServersToGrpcAddresses(servers []string) (serverGrpcAddresses []string, err error) {
+ for _, server := range servers {
+ if serverGrpcAddress, parseErr := ParseServerToGrpcAddress(server); parseErr == nil {
+ serverGrpcAddresses = append(serverGrpcAddresses, serverGrpcAddress)
+ } else {
+ return nil, parseErr
+ }
+ }
+ return
+}
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
@@ -202,3 +212,18 @@ func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption
}, filerGrpcAddress, grpcDialOption)
}
+
+func WithOneOfGrpcFilerClients(filerGrpcAddresses []string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
+
+ for _, filerGrpcAddress := range filerGrpcAddresses {
+ err = WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, grpcDialOption)
+ if err == nil {
+ return nil
+ }
+ }
+
+ return err
+}