diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/wfs.go | 13 | ||||
| -rw-r--r-- | weed/filesys/wfs_filer_client.go | 39 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 2 |
3 files changed, 37 insertions, 17 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 4096d3595..b634420d6 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -28,8 +28,9 @@ import ( type Option struct { MountDirectory string - FilerAddress string - FilerGrpcAddress string + FilerAddresses []string + filerIndex int + FilerGrpcAddresses []string GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string @@ -95,7 +96,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, signature: util.RandomInt32(), } - cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8] + cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8] cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask) @@ -259,11 +260,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { if wfs.option.VolumeServerAccess == "filerProxy" { return func(fileId string) (targetUrls []string, err error) { - return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil + return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil } } return filer.LookupFn(wfs) - +} +func (wfs *WFS) getCurrentFiler() string { + return wfs.option.FilerAddresses[wfs.option.filerIndex] } type NodeWithId uint64 diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index 671d20ba2..95ebdb9b8 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -1,6 +1,7 @@ package filesys import ( + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" @@ -10,19 +11,35 @@ import ( var _ = filer_pb.FilerClient(&WFS{}) -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { - err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) - }) + return util.Retry("filer grpc", func() error { - if err == nil { - return nil - } - return err + i := wfs.option.filerIndex + n := len(wfs.option.FilerGrpcAddresses) + for x := 0; x < n; x++ { + + filerGrpcAddress := wfs.option.FilerGrpcAddresses[i] + err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, wfs.option.GrpcDialOption) + + if err != nil { + glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) + } else { + wfs.option.filerIndex = i + return nil + } + + i++ + if i >= n { + i = 0 + } + + } + return err + }) } diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 730578202..42c13cfd0 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) if wfs.option.VolumeServerAccess == "filerProxy" { - fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId) + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId) } uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { |
