aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_pages_temp_file.go11
-rw-r--r--weed/filesys/wfs.go37
-rw-r--r--weed/filesys/wfs_filer_client.go39
-rw-r--r--weed/filesys/wfs_write.go2
4 files changed, 59 insertions, 30 deletions
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index 274b2a09e..3826008b7 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -6,7 +6,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"os"
- "path/filepath"
"sync"
"time"
)
@@ -24,14 +23,6 @@ type TempFileDirtyPages struct {
replication string
}
-var (
- tmpDir = filepath.Join(os.TempDir(), "sw")
-)
-
-func init() {
- os.Mkdir(tmpDir, 0755)
-}
-
func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{
@@ -49,7 +40,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
defer pages.pageAddLock.Unlock()
if pages.tf == nil {
- tf, err := os.CreateTemp(tmpDir, "")
+ tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "")
if err != nil {
glog.Errorf("create temp file: %v", err)
pages.lastErr = err
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 4096d3595..8f864a123 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -7,8 +7,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
+ "math/rand"
"os"
"path"
+ "path/filepath"
"sync"
"time"
@@ -28,8 +30,9 @@ import (
type Option struct {
MountDirectory string
- FilerAddress string
- FilerGrpcAddress string
+ FilerAddresses []string
+ filerIndex int
+ FilerGrpcAddresses []string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
@@ -52,6 +55,9 @@ type Option struct {
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
+
+ uniqueCacheDir string
+ uniqueCacheTempPageDir string
}
var _ = fs.FS(&WFS{})
@@ -95,14 +101,13 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
signature: util.RandomInt32(),
}
- cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
- cacheDir := path.Join(option.CacheDir, cacheUniqueId)
+ wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
+ wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
- os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
- wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
fsNode := NodeWithId(filePath.AsInode())
if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
@@ -259,11 +264,27 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
- return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
+}
+func (wfs *WFS) getCurrentFiler() string {
+ return wfs.option.FilerAddresses[wfs.option.filerIndex]
+}
+func (option *Option) setupUniqueCacheDirectory() {
+ cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
+ option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
+ option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
+ os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
+}
+
+func (option *Option) getTempFilePageDir() string {
+ return option.uniqueCacheTempPageDir
+}
+func (option *Option) getUniqueCacheDir() string {
+ return option.uniqueCacheDir
}
type NodeWithId uint64
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index 671d20ba2..95ebdb9b8 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -1,6 +1,7 @@
package filesys
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
@@ -10,19 +11,35 @@ import (
var _ = filer_pb.FilerClient(&WFS{})
-func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
- err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
- })
+ return util.Retry("filer grpc", func() error {
- if err == nil {
- return nil
- }
- return err
+ i := wfs.option.filerIndex
+ n := len(wfs.option.FilerGrpcAddresses)
+ for x := 0; x < n; x++ {
+
+ filerGrpcAddress := wfs.option.FilerGrpcAddresses[i]
+ err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, wfs.option.GrpcDialOption)
+
+ if err != nil {
+ glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
+ } else {
+ wfs.option.filerIndex = i
+ return nil
+ }
+
+ i++
+ if i >= n {
+ i = 0
+ }
+
+ }
+ return err
+ })
}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 730578202..42c13cfd0 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
- fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {