aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod5
-rw-r--r--go.sum6
-rw-r--r--weed/Makefile4
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/mount2.go83
-rw-r--r--weed/command/mount2_std.go208
-rw-r--r--weed/mount/directory.go42
-rw-r--r--weed/mount/directory_read.go84
-rw-r--r--weed/mount/inode_to_path.go61
-rw-r--r--weed/mount/unmount/unmount.go6
-rw-r--r--weed/mount/unmount/unmount_linux.go21
-rw-r--r--weed/mount/unmount/unmount_std.go18
-rw-r--r--weed/mount/weedfs.go144
-rw-r--r--weed/mount/weedfs_attr.go125
-rw-r--r--weed/mount/weedfs_attr_darwin.go8
-rw-r--r--weed/mount/weedfs_attr_linux.go9
-rw-r--r--weed/mount/weedfs_dir_lookup.go59
-rw-r--r--weed/mount/weedfs_dir_read.go93
-rw-r--r--weed/mount/weedfs_stats.go80
-rw-r--r--weed/mount/wfs_filer_client.go51
20 files changed, 1107 insertions, 1 deletions
diff --git a/go.mod b/go.mod
index 6176714c0..ffdc48dcc 100644
--- a/go.mod
+++ b/go.mod
@@ -162,7 +162,10 @@ require (
modernc.org/token v1.0.0 // indirect
)
-require github.com/fluent/fluent-logger-golang v1.8.0
+require (
+ github.com/fluent/fluent-logger-golang v1.8.0
+ github.com/hanwen/go-fuse/v2 v2.1.0
+)
require (
cloud.google.com/go/kms v1.0.0 // indirect
diff --git a/go.sum b/go.sum
index 7aa37b888..1054996bf 100644
--- a/go.sum
+++ b/go.sum
@@ -415,6 +415,10 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
+github.com/hanwen/go-fuse v1.0.0 h1:GxS9Zrn6c35/BnfiVsZVWmsG803xwE7eVRDvcf/BEVc=
+github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok=
+github.com/hanwen/go-fuse/v2 v2.1.0 h1:+32ffteETaLYClUj0a3aHjZ1hOPxxaNEHiZiujuDaek=
+github.com/hanwen/go-fuse/v2 v2.1.0/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
@@ -512,6 +516,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kurin/blazer v0.5.3 h1:SAgYv0TKU0kN/ETfO5ExjNAPyMt2FocO2s/UlCHfjAk=
github.com/kurin/blazer v0.5.3/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt0pGU=
+github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4=
+github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
diff --git a/weed/Makefile b/weed/Makefile
index 4e871a71e..1d1a8476c 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -21,6 +21,10 @@ debug_mount:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000
+debug_mount2:
+ go build -gcflags="all=-N -l"
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount2 -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000
+
debug_server:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=~/tmp/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1
diff --git a/weed/command/command.go b/weed/command/command.go
index dbc18a053..c6665a7be 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -30,6 +30,7 @@ var Commands = []*Command{
cmdMaster,
cmdMasterFollower,
cmdMount,
+ cmdMount2,
cmdS3,
cmdIam,
cmdMsgBroker,
diff --git a/weed/command/mount2.go b/weed/command/mount2.go
new file mode 100644
index 000000000..d7b125c6f
--- /dev/null
+++ b/weed/command/mount2.go
@@ -0,0 +1,83 @@
+package command
+
+import (
+ "os"
+ "time"
+)
+
+type Mount2Options struct {
+ filer *string
+ filerMountRootPath *string
+ dir *string
+ dirAutoCreate *bool
+ collection *string
+ replication *string
+ diskType *string
+ ttlSec *int
+ chunkSizeLimitMB *int
+ concurrentWriters *int
+ cacheDir *string
+ cacheSizeMB *int64
+ dataCenter *string
+ allowOthers *bool
+ umaskString *string
+ nonempty *bool
+ volumeServerAccess *string
+ uidMap *string
+ gidMap *string
+ readOnly *bool
+ debug *bool
+ debugPort *int
+}
+
+var (
+ mount2Options Mount2Options
+)
+
+func init() {
+ cmdMount2.Run = runMount2 // break init cycle
+ mount2Options.filer = cmdMount2.Flag.String("filer", "localhost:8888", "comma-separated weed filer location")
+ mount2Options.filerMountRootPath = cmdMount2.Flag.String("filer.path", "/", "mount this remote path from filer server")
+ mount2Options.dir = cmdMount2.Flag.String("dir", ".", "mount weed filer to this directory")
+ mount2Options.dirAutoCreate = cmdMount2.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
+ mount2Options.collection = cmdMount2.Flag.String("collection", "", "collection to create the files")
+ mount2Options.replication = cmdMount2.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
+ mount2Options.diskType = cmdMount2.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
+ mount2Options.ttlSec = cmdMount2.Flag.Int("ttl", 0, "file ttl in seconds")
+ mount2Options.chunkSizeLimitMB = cmdMount2.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
+ mount2Options.concurrentWriters = cmdMount2.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0")
+ mount2Options.cacheDir = cmdMount2.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
+ mount2Options.cacheSizeMB = cmdMount2.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
+ mount2Options.dataCenter = cmdMount2.Flag.String("dataCenter", "", "prefer to write to the data center")
+ mount2Options.allowOthers = cmdMount2.Flag.Bool("allowOthers", true, "allows other users to access the file system")
+ mount2Options.umaskString = cmdMount2.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
+ mount2Options.nonempty = cmdMount2.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
+ mount2Options.volumeServerAccess = cmdMount2.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]")
+ mount2Options.uidMap = cmdMount2.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
+ mount2Options.gidMap = cmdMount2.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
+ mount2Options.readOnly = cmdMount2.Flag.Bool("readOnly", false, "read only")
+ mount2Options.debug = cmdMount2.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
+ mount2Options.debugPort = cmdMount2.Flag.Int("debug.port", 6061, "http port for debugging")
+
+ mountCpuProfile = cmdMount2.Flag.String("cpuprofile", "", "cpu profile output file")
+ mountMemProfile = cmdMount2.Flag.String("memprofile", "", "memory profile output file")
+ mountReadRetryTime = cmdMount2.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time")
+}
+
+var cmdMount2 = &Command{
+ UsageLine: "mount2 -filer=localhost:8888 -dir=/some/dir",
+ Short: "<WIP> mount weed filer to a directory as file system in userspace(FUSE)",
+ Long: `mount weed filer to userspace.
+
+ Pre-requisites:
+ 1) have SeaweedFS master and volume servers running
+ 2) have a "weed filer" running
+ These 2 requirements can be achieved with one command "weed server -filer=true"
+
+ This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on
+ Linux, and OS X.
+
+ On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
+
+ `,
+}
diff --git a/weed/command/mount2_std.go b/weed/command/mount2_std.go
new file mode 100644
index 000000000..cb2b46556
--- /dev/null
+++ b/weed/command/mount2_std.go
@@ -0,0 +1,208 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mount"
+ "github.com/chrislusf/seaweedfs/weed/mount/unmount"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "net/http"
+ "os"
+ "os/user"
+ "runtime"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+)
+
+func runMount2(cmd *Command, args []string) bool {
+
+ if *mountOptions.debug {
+ go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
+ }
+
+ grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
+ if *mountReadRetryTime < time.Second {
+ *mountReadRetryTime = time.Second
+ }
+ util.RetryWaitTime = *mountReadRetryTime
+
+ umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
+ if umaskErr != nil {
+ fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
+ return false
+ }
+
+ if len(args) > 0 {
+ return false
+ }
+
+ return RunMount2(&mount2Options, os.FileMode(umask))
+}
+
+func RunMount2(option *Mount2Options, umask os.FileMode) bool {
+
+ // basic checks
+ chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
+ if chunkSizeLimitMB <= 0 {
+ fmt.Printf("Please specify a reasonable buffer size.")
+ return false
+ }
+
+ // try to connect to filer
+ filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var cipher bool
+ var err error
+ for i := 0; i < 10; i++ {
+ err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
+ glog.V(0).Infof("wait for %d seconds ...", i+1)
+ time.Sleep(time.Duration(i+1) * time.Second)
+ }
+ }
+ if err != nil {
+ glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
+ return true
+ }
+
+ filerMountRootPath := *option.filerMountRootPath
+
+ // clean up mount point
+ dir := util.ResolvePath(*option.dir)
+ if dir == "" {
+ fmt.Printf("Please specify the mount directory via \"-dir\"")
+ return false
+ }
+
+ unmount.Unmount(dir)
+
+ // detect mount folder mode
+ if *option.dirAutoCreate {
+ os.MkdirAll(dir, os.FileMode(0777)&^umask)
+ }
+ fileInfo, err := os.Stat(dir)
+
+ // collect uid, gid
+ uid, gid := uint32(0), uint32(0)
+ mountMode := os.ModeDir | 0777
+ if err == nil {
+ mountMode = os.ModeDir | os.FileMode(0777)&^umask
+ uid, gid = util.GetFileUidGid(fileInfo)
+ fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
+ } else {
+ fmt.Printf("can not stat %s\n", dir)
+ return false
+ }
+
+ // detect uid, gid
+ if uid == 0 {
+ if u, err := user.Current(); err == nil {
+ if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
+ uid = uint32(parsedId)
+ }
+ if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
+ gid = uint32(parsedId)
+ }
+ fmt.Printf("current uid=%d gid=%d\n", uid, gid)
+ }
+ }
+
+ // mapping uid, gid
+ uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
+ if err != nil {
+ fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
+ return false
+ }
+
+ // Ensure target mount point availability
+ if isValid := checkMountPointAvailable(dir); !isValid {
+ glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
+ return true
+ }
+
+ // mount fuse
+ fuseMountOptions := &fuse.MountOptions{
+ AllowOther: *option.allowOthers,
+ Options: nil,
+ MaxBackground: 128,
+ MaxWrite: 1024 * 1024 * 2,
+ MaxReadAhead: 1024 * 1024 * 2,
+ IgnoreSecurityLabels: false,
+ RememberInodes: false,
+ FsName: *option.filer + ":" + filerMountRootPath,
+ Name: "seaweedfs",
+ SingleThreaded: false,
+ DisableXAttrs: false,
+ Debug: true,
+ EnableLocks: false,
+ ExplicitDataCacheControl: false,
+ // SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
+ DirectMount: true,
+ DirectMountFlags: 0,
+ // EnableAcl: false,
+ }
+
+ // find mount point
+ mountRoot := filerMountRootPath
+ if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
+ mountRoot = mountRoot[0 : len(mountRoot)-1]
+ }
+
+ seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
+ MountDirectory: dir,
+ FilerAddresses: filerAddresses,
+ GrpcDialOption: grpcDialOption,
+ FilerMountRootPath: mountRoot,
+ Collection: *option.collection,
+ Replication: *option.replication,
+ TtlSec: int32(*option.ttlSec),
+ DiskType: types.ToDiskType(*option.diskType),
+ ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
+ ConcurrentWriters: *option.concurrentWriters,
+ CacheDir: *option.cacheDir,
+ CacheSizeMB: *option.cacheSizeMB,
+ DataCenter: *option.dataCenter,
+ MountUid: uid,
+ MountGid: gid,
+ MountMode: mountMode,
+ MountCtime: fileInfo.ModTime(),
+ MountMtime: time.Now(),
+ Umask: umask,
+ VolumeServerAccess: *mountOptions.volumeServerAccess,
+ Cipher: cipher,
+ UidGidMapper: uidGidMapper,
+ })
+
+ server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
+ if err != nil {
+ glog.Fatalf("Mount fail: %v", err)
+ }
+ grace.OnInterrupt(func() {
+ unmount.Unmount(dir)
+ })
+
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
+
+ server.Serve()
+
+ return true
+}
diff --git a/weed/mount/directory.go b/weed/mount/directory.go
new file mode 100644
index 000000000..60fbafc37
--- /dev/null
+++ b/weed/mount/directory.go
@@ -0,0 +1,42 @@
+package mount
+
+import (
+ "bytes"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/hanwen/go-fuse/v2/fs"
+ "strings"
+)
+
+type Directory struct {
+ fs.Inode
+
+ name string
+ wfs *WFS
+ entry *filer_pb.Entry
+ parent *Directory
+ id uint64
+}
+
+func (dir *Directory) FullPath() string {
+ var parts []string
+ for p := dir; p != nil; p = p.parent {
+ if strings.HasPrefix(p.name, "/") {
+ if len(p.name) > 1 {
+ parts = append(parts, p.name[1:])
+ }
+ } else {
+ parts = append(parts, p.name)
+ }
+ }
+
+ if len(parts) == 0 {
+ return "/"
+ }
+
+ var buf bytes.Buffer
+ for i := len(parts) - 1; i >= 0; i-- {
+ buf.WriteString("/")
+ buf.WriteString(parts[i])
+ }
+ return buf.String()
+}
diff --git a/weed/mount/directory_read.go b/weed/mount/directory_read.go
new file mode 100644
index 000000000..51c51ae16
--- /dev/null
+++ b/weed/mount/directory_read.go
@@ -0,0 +1,84 @@
+package mount
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/hanwen/go-fuse/v2/fs"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "math"
+ "os"
+ "syscall"
+)
+
+var _ = fs.NodeReaddirer(&Directory{})
+var _ = fs.NodeGetattrer(&Directory{})
+
+func (dir *Directory) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
+ out.Mode = 0755
+ return 0
+}
+
+func (dir *Directory) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) {
+
+ dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("Readdir %s", dirPath)
+
+ sourceChan := make(chan fuse.DirEntry, 64)
+
+ stream := newDirectoryListStream(sourceChan)
+
+ processEachEntryFn := func(entry *filer.Entry, isLast bool) {
+ sourceChan <- fuse.DirEntry{
+ Mode: uint32(entry.Mode),
+ Name: entry.Name(),
+ Ino: dirPath.Child(entry.Name()).AsInode(os.ModeDir),
+ }
+ }
+
+ if err := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return nil, fs.ToErrno(os.ErrInvalid)
+ }
+ go func() {
+ dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ processEachEntryFn(entry, false)
+ return true
+ })
+ close(sourceChan)
+ }()
+
+ return stream, fs.OK
+}
+
+var _ = fs.DirStream(&DirectoryListStream{})
+
+type DirectoryListStream struct {
+ next fuse.DirEntry
+ sourceChan chan fuse.DirEntry
+ isStarted bool
+ hasNext bool
+}
+
+func newDirectoryListStream(ch chan fuse.DirEntry) *DirectoryListStream {
+ return &DirectoryListStream{
+ sourceChan: ch,
+ }
+}
+
+func (i *DirectoryListStream) HasNext() bool {
+ if !i.isStarted {
+ i.next, i.hasNext = <-i.sourceChan
+ i.isStarted = true
+ }
+ return i.hasNext
+}
+func (i *DirectoryListStream) Next() (fuse.DirEntry, syscall.Errno) {
+ t := i.next
+ i.next, i.hasNext = <-i.sourceChan
+ return t, fs.OK
+}
+func (i *DirectoryListStream) Close() {
+}
diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go
new file mode 100644
index 000000000..04366ab0d
--- /dev/null
+++ b/weed/mount/inode_to_path.go
@@ -0,0 +1,61 @@
+package mount
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "sync"
+)
+
+type InodeToPath struct {
+ sync.RWMutex
+ nextInodeId uint64
+ inode2path map[uint64]util.FullPath
+ path2inode map[util.FullPath]uint64
+}
+
+func NewInodeToPath() *InodeToPath {
+ return &InodeToPath{
+ inode2path: make(map[uint64]util.FullPath),
+ path2inode: make(map[util.FullPath]uint64),
+ nextInodeId: 2, // the root inode id is 1
+ }
+}
+
+func (i *InodeToPath) GetInode(path util.FullPath) uint64 {
+ if path == "/" {
+ return 1
+ }
+ i.Lock()
+ defer i.Unlock()
+ inode, found := i.path2inode[path]
+ if !found {
+ inode = i.nextInodeId
+ i.nextInodeId++
+ i.path2inode[path] = inode
+ i.inode2path[inode] = path
+ }
+ return inode
+}
+
+func (i *InodeToPath) GetPath(inode uint64) util.FullPath {
+ if inode == 1 {
+ return "/"
+ }
+ i.RLock()
+ defer i.RUnlock()
+ path, found := i.inode2path[inode]
+ if !found {
+ glog.Fatal("not found inode %d", inode)
+ }
+ return path
+}
+
+func (i *InodeToPath) HasPath(path util.FullPath) bool {
+ if path == "/" {
+ return true
+ }
+ i.RLock()
+ defer i.RUnlock()
+ _, found := i.path2inode[path]
+ return found
+}
diff --git a/weed/mount/unmount/unmount.go b/weed/mount/unmount/unmount.go
new file mode 100644
index 000000000..c481d8030
--- /dev/null
+++ b/weed/mount/unmount/unmount.go
@@ -0,0 +1,6 @@
+package unmount
+
+// Unmount tries to unmount the filesystem mounted at dir.
+func Unmount(dir string) error {
+ return unmount(dir)
+}
diff --git a/weed/mount/unmount/unmount_linux.go b/weed/mount/unmount/unmount_linux.go
new file mode 100644
index 000000000..e55d48f86
--- /dev/null
+++ b/weed/mount/unmount/unmount_linux.go
@@ -0,0 +1,21 @@
+package unmount
+
+import (
+ "bytes"
+ "errors"
+ "os/exec"
+)
+
+func unmount(dir string) error {
+ cmd := exec.Command("fusermount", "-u", dir)
+ output, err := cmd.CombinedOutput()
+ if err != nil {
+ if len(output) > 0 {
+ output = bytes.TrimRight(output, "\n")
+ msg := err.Error() + ": " + string(output)
+ err = errors.New(msg)
+ }
+ return err
+ }
+ return nil
+}
diff --git a/weed/mount/unmount/unmount_std.go b/weed/mount/unmount/unmount_std.go
new file mode 100644
index 000000000..76267fb6a
--- /dev/null
+++ b/weed/mount/unmount/unmount_std.go
@@ -0,0 +1,18 @@
+//go:build !linux
+// +build !linux
+
+package unmount
+
+import (
+ "os"
+ "syscall"
+)
+
+func unmount(dir string) error {
+ err := syscall.Unmount(dir, 0)
+ if err != nil {
+ err = &os.PathError{Op: "unmount", Path: dir, Err: err}
+ return err
+ }
+ return nil
+}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
new file mode 100644
index 000000000..fa5c7d4f9
--- /dev/null
+++ b/weed/mount/weedfs.go
@@ -0,0 +1,144 @@
+package mount
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "google.golang.org/grpc"
+ "os"
+ "path"
+ "path/filepath"
+ "time"
+
+ "github.com/hanwen/go-fuse/v2/fs"
+)
+
+type Option struct {
+ MountDirectory string
+ FilerAddresses []pb.ServerAddress
+ filerIndex int
+ GrpcDialOption grpc.DialOption
+ FilerMountRootPath string
+ Collection string
+ Replication string
+ TtlSec int32
+ DiskType types.DiskType
+ ChunkSizeLimit int64
+ ConcurrentWriters int
+ CacheDir string
+ CacheSizeMB int64
+ DataCenter string
+ Umask os.FileMode
+
+ MountUid uint32
+ MountGid uint32
+ MountMode os.FileMode
+ MountCtime time.Time
+ MountMtime time.Time
+ MountParentInode uint64
+
+ VolumeServerAccess string // how to access volume servers
+ Cipher bool // whether encrypt data on volume server
+ UidGidMapper *meta_cache.UidGidMapper
+
+ uniqueCacheDir string
+ uniqueCacheTempPageDir string
+}
+
+type WFS struct {
+ // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
+ fuse.RawFileSystem
+ fs.Inode
+ option *Option
+ metaCache *meta_cache.MetaCache
+ stats statsCache
+ root Directory
+ signature int32
+ inodeToPath *InodeToPath
+}
+
+func NewSeaweedFileSystem(option *Option) *WFS {
+ wfs := &WFS{
+ RawFileSystem: fuse.NewDefaultRawFileSystem(),
+ option: option,
+ signature: util.RandomInt32(),
+ inodeToPath: NewInodeToPath(),
+ }
+
+ wfs.root = Directory{
+ name: "/",
+ wfs: wfs,
+ entry: nil,
+ parent: nil,
+ }
+
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
+ })
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+
+ return wfs
+}
+
+func (wfs *WFS) Root() *Directory {
+ return &wfs.root
+}
+
+func (wfs *WFS) String() string {
+ return "seaweedfs"
+}
+
+func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, entry *filer_pb.Entry, status fuse.Status) {
+ path = wfs.inodeToPath.GetPath(inode)
+ entry, status = wfs.maybeLoadEntry(path)
+ return
+}
+
+func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
+
+ // glog.V(3).Infof("read entry cache miss %s", fullpath)
+ dir, name := fullpath.DirAndName()
+
+ // return a valid entry for the mount root
+ if string(fullpath) == wfs.option.FilerMountRootPath {
+ return &filer_pb.Entry{
+ Name: name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: wfs.option.MountMtime.Unix(),
+ FileMode: uint32(wfs.option.MountMode),
+ Uid: wfs.option.MountUid,
+ Gid: wfs.option.MountGid,
+ Crtime: wfs.option.MountCtime.Unix(),
+ },
+ }, fuse.OK
+ }
+
+ // read from async meta cache
+ meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
+ cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ return cachedEntry.ToProtoEntry(), fuse.ENOSYS
+}
+
+func (option *Option) setupUniqueCacheDirectory() {
+ cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
+ option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
+ option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
+ os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
+}
+
+func (option *Option) getTempFilePageDir() string {
+ return option.uniqueCacheTempPageDir
+}
+func (option *Option) getUniqueCacheDir() string {
+ return option.uniqueCacheDir
+}
diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go
new file mode 100644
index 000000000..788badb77
--- /dev/null
+++ b/weed/mount/weedfs_attr.go
@@ -0,0 +1,125 @@
+package mount
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "os"
+ "syscall"
+ "time"
+)
+
+func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse.AttrOut) (code fuse.Status) {
+ println("input node id", input.NodeId)
+ if input.NodeId == 1 {
+ wfs.setRootAttr(out)
+ return fuse.OK
+ }
+
+ _, entry, status := wfs.maybeReadEntry(input.NodeId)
+ if status != fuse.OK {
+ return status
+ }
+ out.AttrValid = 1
+ wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
+
+ return fuse.OK
+}
+
+func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse.AttrOut) (code fuse.Status) {
+ return fuse.ENOSYS
+}
+func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string, dest []byte) (size uint32, code fuse.Status) {
+ return 0, fuse.ENOSYS
+}
+
+func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr string, data []byte) fuse.Status {
+ return fuse.ENOSYS
+}
+
+func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) {
+ return 0, fuse.ENOSYS
+}
+
+func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string) fuse.Status {
+ return fuse.ENOSYS
+}
+
+func (wfs *WFS) setRootAttr(out *fuse.AttrOut) {
+ now := uint64(time.Now().Second())
+ out.AttrValid = 119
+ out.Ino = 1
+ setBlksize(&out.Attr, blockSize)
+ out.Uid = wfs.option.MountUid
+ out.Gid = wfs.option.MountGid
+ out.Mtime = now
+ out.Ctime = now
+ out.Atime = now
+ out.Mode = toSystemType(os.ModeDir) | uint32(wfs.option.MountMode)
+ out.Nlink = 1
+}
+
+func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) {
+ out.Ino = inode
+ out.Uid = entry.Attributes.Uid
+ out.Gid = entry.Attributes.Gid
+ out.Mode = toSystemMode(os.FileMode(entry.Attributes.FileMode))
+ out.Mtime = uint64(entry.Attributes.Mtime)
+ out.Ctime = uint64(entry.Attributes.Mtime)
+ out.Atime = uint64(entry.Attributes.Mtime)
+ if entry.HardLinkCounter > 0 {
+ out.Nlink = uint32(entry.HardLinkCounter)
+ }
+ out.Size = filer.FileSize(entry)
+ out.Blocks = out.Size/blockSize + 1
+ setBlksize(out, blockSize)
+ out.Nlink = 1
+}
+
+func (wfs *WFS) setAttrByFilerEntry(out *fuse.Attr, inode uint64, entry *filer.Entry) {
+ out.Ino = inode
+ out.Size = entry.FileSize
+ out.Blocks = out.Size/blockSize + 1
+ setBlksize(out, blockSize)
+ out.Atime = uint64(entry.Attr.Mtime.Unix())
+ out.Mtime = uint64(entry.Attr.Mtime.Unix())
+ out.Ctime = uint64(entry.Attr.Mtime.Unix())
+ out.Crtime_ = uint64(entry.Attr.Crtime.Unix())
+ out.Mode = toSystemMode(entry.Attr.Mode)
+ if entry.HardLinkCounter > 0 {
+ out.Nlink = uint32(entry.HardLinkCounter)
+ }
+ out.Nlink = 1
+ out.Uid = entry.Attr.Uid
+ out.Gid = entry.Attr.Gid
+}
+
+func (wfs *WFS) outputEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) {
+ // out.Generation = 1
+ out.EntryValid = 1
+ out.AttrValid = 1
+ wfs.setAttrByFilerEntry(&out.Attr, inode, entry)
+}
+
+func toSystemMode(mode os.FileMode) uint32 {
+ return toSystemType(mode) | uint32(mode)
+}
+
+func toSystemType(mode os.FileMode) uint32 {
+ switch mode & os.ModeType {
+ case os.ModeDir:
+ return syscall.S_IFDIR
+ case os.ModeSymlink:
+ return syscall.S_IFLNK
+ case os.ModeNamedPipe:
+ return syscall.S_IFIFO
+ case os.ModeSocket:
+ return syscall.S_IFSOCK
+ case os.ModeDevice:
+ return syscall.S_IFBLK
+ case os.ModeCharDevice:
+ return syscall.S_IFCHR
+ default:
+ return syscall.S_IFREG
+ }
+}
diff --git a/weed/mount/weedfs_attr_darwin.go b/weed/mount/weedfs_attr_darwin.go
new file mode 100644
index 000000000..e7767d4a6
--- /dev/null
+++ b/weed/mount/weedfs_attr_darwin.go
@@ -0,0 +1,8 @@
+package mount
+
+import (
+ "github.com/hanwen/go-fuse/v2/fuse"
+)
+
+func setBlksize(out *fuse.Attr, size uint32) {
+}
diff --git a/weed/mount/weedfs_attr_linux.go b/weed/mount/weedfs_attr_linux.go
new file mode 100644
index 000000000..56be62e62
--- /dev/null
+++ b/weed/mount/weedfs_attr_linux.go
@@ -0,0 +1,9 @@
+package mount
+
+import (
+ "github.com/hanwen/go-fuse/v2/fuse"
+)
+
+func setBlksize(out *fuse.Attr, size uint32) {
+ out.Blksize = size
+}
diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go
new file mode 100644
index 000000000..672ba9711
--- /dev/null
+++ b/weed/mount/weedfs_dir_lookup.go
@@ -0,0 +1,59 @@
+package mount
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/hanwen/go-fuse/v2/fuse"
+)
+
+// Lookup is called by the kernel when the VFS wants to know
+// about a file inside a directory. Many lookup calls can
+// occur in parallel, but only one call happens for each (dir,
+// name) pair.
+
+func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name string, out *fuse.EntryOut) (code fuse.Status) {
+
+ dirPath := wfs.inodeToPath.GetPath(header.NodeId)
+
+ println("lookup", name, "dir inode", header.NodeId)
+
+ fullFilePath := dirPath.Child(name)
+
+ visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath)
+ if visitErr != nil {
+ glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
+ return fuse.EIO
+ }
+ localEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return fuse.ENOENT
+ }
+
+ if localEntry == nil {
+ // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
+ entry, err := filer_pb.GetEntry(wfs, fullFilePath)
+ if err != nil {
+ glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
+ return fuse.ENOENT
+ }
+ localEntry = filer.FromPbEntry(string(dirPath), entry)
+ } else {
+ glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
+ }
+
+ if localEntry == nil {
+ return fuse.ENOENT
+ }
+
+ inode := wfs.inodeToPath.GetInode(fullFilePath)
+
+ println("found", name, "inode", inode)
+
+ wfs.outputEntry(out, inode, localEntry)
+
+ return fuse.OK
+
+}
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
new file mode 100644
index 000000000..a696953a1
--- /dev/null
+++ b/weed/mount/weedfs_dir_read.go
@@ -0,0 +1,93 @@
+package mount
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "math"
+ "os"
+)
+
+// Directory handling
+
+func (wfs *WFS) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut) (code fuse.Status) {
+ return fuse.OK
+}
+func (wfs *WFS) ReleaseDir(input *fuse.ReleaseIn) {
+}
+func (wfs *WFS) FsyncDir(cancel <-chan struct{}, input *fuse.FsyncIn) (code fuse.Status) {
+ return fuse.OK
+}
+
+func (wfs *WFS) ReadDir(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) {
+ return wfs.doReadDirectory(input, out, false)
+}
+
+func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) {
+ return wfs.doReadDirectory(input, out, true)
+}
+
+func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status {
+ dirPath := wfs.inodeToPath.GetPath(input.NodeId)
+
+ println("input size", input.Size, "offset", input.Offset, "pid", input.Caller.Pid)
+
+ var dirEntry fuse.DirEntry
+ if input.Offset == 0 {
+ dirEntry.Ino = input.NodeId
+ dirEntry.Name = "."
+ dirEntry.Mode = toSystemMode(os.ModeDir)
+ out.AddDirEntry(dirEntry)
+
+ parentDir, _ := dirPath.DirAndName()
+ parentInode := wfs.inodeToPath.GetInode(util.FullPath(parentDir))
+ dirEntry.Ino = parentInode
+ dirEntry.Name = ".."
+ dirEntry.Mode = toSystemMode(os.ModeDir)
+ out.AddDirEntry(dirEntry)
+
+ }
+
+ var counter uint64
+ processEachEntryFn := func(entry *filer.Entry, isLast bool) bool {
+ counter++
+ if counter <= input.Offset {
+ return true
+ }
+ dirEntry.Name = entry.Name()
+ inode := wfs.inodeToPath.GetInode(dirPath.Child(dirEntry.Name))
+ println("entry", dirEntry.Name, "inode", inode)
+ dirEntry.Ino = inode
+ dirEntry.Mode = toSystemMode(entry.Mode)
+ if !isPlusMode {
+ if !out.AddDirEntry(dirEntry) {
+ return false
+ }
+ } else {
+ entryOut := out.AddDirLookupEntry(dirEntry)
+ if entryOut == nil {
+ return false
+ }
+ wfs.outputEntry(entryOut, inode, entry)
+ }
+ return true
+ }
+
+ // TODO remove this with checking whether directory is not forgotten
+ if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return fuse.EIO
+ }
+ listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ return processEachEntryFn(entry, false)
+ })
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return fuse.EIO
+ }
+
+ return fuse.OK
+}
diff --git a/weed/mount/weedfs_stats.go b/weed/mount/weedfs_stats.go
new file mode 100644
index 000000000..3de561082
--- /dev/null
+++ b/weed/mount/weedfs_stats.go
@@ -0,0 +1,80 @@
+package mount
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "math"
+ "time"
+)
+
+const blockSize = 512
+
+type statsCache struct {
+ filer_pb.StatisticsResponse
+ lastChecked int64 // unix time in seconds
+}
+
+func (wfs *WFS) StatFs(cancel <-chan struct{}, in *fuse.InHeader, out *fuse.StatfsOut) (code fuse.Status) {
+
+ glog.V(4).Infof("reading fs stats")
+
+ if wfs.stats.lastChecked < time.Now().Unix()-20 {
+
+ err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.StatisticsRequest{
+ Collection: wfs.option.Collection,
+ Replication: wfs.option.Replication,
+ Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
+ DiskType: string(wfs.option.DiskType),
+ }
+
+ glog.V(4).Infof("reading filer stats: %+v", request)
+ resp, err := client.Statistics(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("reading filer stats %v: %v", request, err)
+ return err
+ }
+ glog.V(4).Infof("read filer stats: %+v", resp)
+
+ wfs.stats.TotalSize = resp.TotalSize
+ wfs.stats.UsedSize = resp.UsedSize
+ wfs.stats.FileCount = resp.FileCount
+ wfs.stats.lastChecked = time.Now().Unix()
+
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("filer Statistics: %v", err)
+ return fuse.OK
+ }
+ }
+
+ totalDiskSize := wfs.stats.TotalSize
+ usedDiskSize := wfs.stats.UsedSize
+ actualFileCount := wfs.stats.FileCount
+
+ // Compute the total number of available blocks
+ out.Blocks = totalDiskSize / blockSize
+
+ // Compute the number of used blocks
+ numBlocks := uint64(usedDiskSize / blockSize)
+
+ // Report the number of free and available blocks for the block size
+ out.Bfree = out.Blocks - numBlocks
+ out.Bavail = out.Blocks - numBlocks
+ out.Bsize = uint32(blockSize)
+
+ // Report the total number of possible files in the file system (and those free)
+ out.Files = math.MaxInt64
+ out.Ffree = math.MaxInt64 - actualFileCount
+
+ // Report the maximum length of a name and the minimum fragment size
+ out.NameLen = 1024
+ out.Frsize = uint32(blockSize)
+
+ return fuse.OK
+}
diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go
new file mode 100644
index 000000000..e8feb8342
--- /dev/null
+++ b/weed/mount/wfs_filer_client.go
@@ -0,0 +1,51 @@
+package mount
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+var _ = filer_pb.FilerClient(&WFS{})
+
+func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+
+ return util.Retry("filer grpc", func() error {
+
+ i := wfs.option.filerIndex
+ n := len(wfs.option.FilerAddresses)
+ for x := 0; x < n; x++ {
+
+ filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress()
+ err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, wfs.option.GrpcDialOption)
+
+ if err != nil {
+ glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
+ } else {
+ wfs.option.filerIndex = i
+ return nil
+ }
+
+ i++
+ if i >= n {
+ i = 0
+ }
+
+ }
+ return err
+ })
+
+}
+
+func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
+ if wfs.option.VolumeServerAccess == "publicUrl" {
+ return location.PublicUrl
+ }
+ return location.Url
+}