aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-11-10 21:30:17 -0800
committerChris Lu <chris.lu@gmail.com>2021-11-10 21:30:17 -0800
commit10f1cdda0f0947c5676b5119c448675a4d3e8ef6 (patch)
tree14c2994c33027ec05fbfee9f0acdf86a8abcc02b /pkg
parent435b1557a2cf4b147660de8faae2fa40fb8253a2 (diff)
downloadseaweedfs-csi-driver-10f1cdda0f0947c5676b5119c448675a4d3e8ef6.tar.xz
seaweedfs-csi-driver-10f1cdda0f0947c5676b5119c448675a4d3e8ef6.zip
support multiple filers
fix https://github.com/seaweedfs/seaweedfs-csi-driver/issues/41
Diffstat (limited to 'pkg')
-rw-r--r--pkg/driver/driver.go36
-rw-r--r--pkg/driver/mounter_seaweedfs.go12
2 files changed, 39 insertions, 9 deletions
diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go
index 9fb08ff..0286a92 100644
--- a/pkg/driver/driver.go
+++ b/pkg/driver/driver.go
@@ -35,7 +35,8 @@ type SeaweedFsDriver struct {
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
- filer pb.ServerAddress
+ filers []pb.ServerAddress
+ filerIndex int
grpcDialOption grpc.DialOption
ConcurrentWriters int
CacheSizeMB int64
@@ -55,7 +56,7 @@ func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver {
nodeID: nodeID,
name: driverName,
version: version,
- filer: pb.ServerAddress(filer),
+ filers: pb.ServerAddresses(filer).ToAddresses(),
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
}
@@ -129,10 +130,33 @@ var _ = filer_pb.FilerClient(&SeaweedFsDriver{})
func (d *SeaweedFsDriver) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, d.filer.ToGrpcAddress(), d.grpcDialOption)
+ return util.Retry("filer grpc", func() error {
+
+ i := d.filerIndex
+ n := len(d.filers)
+ var err error
+ for x := 0; x < n; x++ {
+
+ err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, d.filers[i].ToGrpcAddress(), d.grpcDialOption)
+
+ if err != nil {
+ glog.V(0).Infof("WithFilerClient %d %v: %v", x, d.filers[i], err)
+ } else {
+ d.filerIndex = i
+ return nil
+ }
+
+ i++
+ if i >= n {
+ i = 0
+ }
+
+ }
+ return err
+ })
}
func (d *SeaweedFsDriver) AdjustedUrl(location *filer_pb.Location) string {
diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go
index a35d702..2bf35fb 100644
--- a/pkg/driver/mounter_seaweedfs.go
+++ b/pkg/driver/mounter_seaweedfs.go
@@ -3,6 +3,7 @@ package driver
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "strings"
)
// Implements Mounter
@@ -29,7 +30,12 @@ func newSeaweedFsMounter(path string, collection string, readOnly bool, driver *
}
func (seaweedFs *seaweedFsMounter) Mount(target string) error {
- glog.V(0).Infof("mounting %s %s to %s", seaweedFs.driver.filer, seaweedFs.path, target)
+ glog.V(0).Infof("mounting %v %s to %s", seaweedFs.driver.filers, seaweedFs.path, target)
+
+ var filers []string
+ for _, address := range seaweedFs.driver.filers {
+ filers = append(filers, string(address))
+ }
args := []string{
"mount",
@@ -37,7 +43,7 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
"-umask=000",
fmt.Sprintf("-dir=%s", target),
fmt.Sprintf("-collection=%s", seaweedFs.collection),
- fmt.Sprintf("-filer=%s", seaweedFs.driver.filer),
+ fmt.Sprintf("-filer=%s", strings.Join(filers, ",")),
fmt.Sprintf("-filer.path=%s", seaweedFs.path),
fmt.Sprintf("-cacheCapacityMB=%d", seaweedFs.driver.CacheSizeMB),
}
@@ -75,7 +81,7 @@ func (seaweedFs *seaweedFsMounter) Mount(target string) error {
err := fuseMount(target, seaweedFsCmd, args)
if err != nil {
- glog.Errorf("mount %s %s to %s: %s", seaweedFs.driver.filer, seaweedFs.path, target, err)
+ glog.Errorf("mount %v %s to %s: %s", seaweedFs.driver.filers, seaweedFs.path, target, err)
}
return err
}