diff options
| -rw-r--r-- | go.mod | 5 | ||||
| -rw-r--r-- | go.sum | 6 | ||||
| -rw-r--r-- | weed/Makefile | 4 | ||||
| -rw-r--r-- | weed/command/command.go | 1 | ||||
| -rw-r--r-- | weed/command/mount2.go | 83 | ||||
| -rw-r--r-- | weed/command/mount2_std.go | 208 | ||||
| -rw-r--r-- | weed/mount/directory.go | 42 | ||||
| -rw-r--r-- | weed/mount/directory_read.go | 84 | ||||
| -rw-r--r-- | weed/mount/inode_to_path.go | 61 | ||||
| -rw-r--r-- | weed/mount/unmount/unmount.go | 6 | ||||
| -rw-r--r-- | weed/mount/unmount/unmount_linux.go | 21 | ||||
| -rw-r--r-- | weed/mount/unmount/unmount_std.go | 18 | ||||
| -rw-r--r-- | weed/mount/weedfs.go | 144 | ||||
| -rw-r--r-- | weed/mount/weedfs_attr.go | 125 | ||||
| -rw-r--r-- | weed/mount/weedfs_attr_darwin.go | 8 | ||||
| -rw-r--r-- | weed/mount/weedfs_attr_linux.go | 9 | ||||
| -rw-r--r-- | weed/mount/weedfs_dir_lookup.go | 59 | ||||
| -rw-r--r-- | weed/mount/weedfs_dir_read.go | 93 | ||||
| -rw-r--r-- | weed/mount/weedfs_stats.go | 80 | ||||
| -rw-r--r-- | weed/mount/wfs_filer_client.go | 51 |
20 files changed, 1107 insertions, 1 deletions
@@ -162,7 +162,10 @@ require ( modernc.org/token v1.0.0 // indirect ) -require github.com/fluent/fluent-logger-golang v1.8.0 +require ( + github.com/fluent/fluent-logger-golang v1.8.0 + github.com/hanwen/go-fuse/v2 v2.1.0 +) require ( cloud.google.com/go/kms v1.0.0 // indirect @@ -415,6 +415,10 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/hanwen/go-fuse v1.0.0 h1:GxS9Zrn6c35/BnfiVsZVWmsG803xwE7eVRDvcf/BEVc= +github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= +github.com/hanwen/go-fuse/v2 v2.1.0 h1:+32ffteETaLYClUj0a3aHjZ1hOPxxaNEHiZiujuDaek= +github.com/hanwen/go-fuse/v2 v2.1.0/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -512,6 +516,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kurin/blazer v0.5.3 h1:SAgYv0TKU0kN/ETfO5ExjNAPyMt2FocO2s/UlCHfjAk= github.com/kurin/blazer v0.5.3/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt0pGU= +github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= +github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= diff --git a/weed/Makefile b/weed/Makefile index 4e871a71e..1d1a8476c 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -21,6 +21,10 @@ debug_mount: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000 +debug_mount2: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount2 -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000 + debug_server: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=~/tmp/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 diff --git a/weed/command/command.go b/weed/command/command.go index dbc18a053..c6665a7be 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -30,6 +30,7 @@ var Commands = []*Command{ cmdMaster, cmdMasterFollower, cmdMount, + cmdMount2, cmdS3, cmdIam, cmdMsgBroker, diff --git a/weed/command/mount2.go b/weed/command/mount2.go new file mode 100644 index 000000000..d7b125c6f --- /dev/null +++ b/weed/command/mount2.go @@ -0,0 +1,83 @@ +package command + +import ( + "os" + "time" +) + +type Mount2Options struct { + filer *string + filerMountRootPath *string + dir *string + dirAutoCreate *bool + collection *string + replication *string + diskType *string + ttlSec *int + chunkSizeLimitMB *int + concurrentWriters *int + cacheDir *string + cacheSizeMB *int64 + dataCenter *string + allowOthers *bool + umaskString *string + nonempty *bool + volumeServerAccess *string + uidMap *string + gidMap *string + readOnly *bool + debug *bool + debugPort *int +} + +var ( + mount2Options Mount2Options +) + +func init() { + cmdMount2.Run = runMount2 // break init cycle + mount2Options.filer = cmdMount2.Flag.String("filer", "localhost:8888", "comma-separated weed filer location") + mount2Options.filerMountRootPath = cmdMount2.Flag.String("filer.path", "/", "mount this remote path from filer server") + mount2Options.dir = cmdMount2.Flag.String("dir", ".", "mount weed filer to this directory") + mount2Options.dirAutoCreate = cmdMount2.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to") + mount2Options.collection = cmdMount2.Flag.String("collection", "", "collection to create the files") + mount2Options.replication = cmdMount2.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") + mount2Options.diskType = cmdMount2.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") + mount2Options.ttlSec = cmdMount2.Flag.Int("ttl", 0, "file ttl in seconds") + mount2Options.chunkSizeLimitMB = cmdMount2.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") + mount2Options.concurrentWriters = cmdMount2.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0") + mount2Options.cacheDir = cmdMount2.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") + mount2Options.cacheSizeMB = cmdMount2.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)") + mount2Options.dataCenter = cmdMount2.Flag.String("dataCenter", "", "prefer to write to the data center") + mount2Options.allowOthers = cmdMount2.Flag.Bool("allowOthers", true, "allows other users to access the file system") + mount2Options.umaskString = cmdMount2.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") + mount2Options.nonempty = cmdMount2.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory") + mount2Options.volumeServerAccess = cmdMount2.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]") + mount2Options.uidMap = cmdMount2.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>") + mount2Options.gidMap = cmdMount2.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>") + mount2Options.readOnly = cmdMount2.Flag.Bool("readOnly", false, "read only") + mount2Options.debug = cmdMount2.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") + mount2Options.debugPort = cmdMount2.Flag.Int("debug.port", 6061, "http port for debugging") + + mountCpuProfile = cmdMount2.Flag.String("cpuprofile", "", "cpu profile output file") + mountMemProfile = cmdMount2.Flag.String("memprofile", "", "memory profile output file") + mountReadRetryTime = cmdMount2.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time") +} + +var cmdMount2 = &Command{ + UsageLine: "mount2 -filer=localhost:8888 -dir=/some/dir", + Short: "<WIP> mount weed filer to a directory as file system in userspace(FUSE)", + Long: `mount weed filer to userspace. + + Pre-requisites: + 1) have SeaweedFS master and volume servers running + 2) have a "weed filer" running + These 2 requirements can be achieved with one command "weed server -filer=true" + + This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on + Linux, and OS X. + + On OS X, it requires OSXFUSE (http://osxfuse.github.com/). + + `, +} diff --git a/weed/command/mount2_std.go b/weed/command/mount2_std.go new file mode 100644 index 000000000..cb2b46556 --- /dev/null +++ b/weed/command/mount2_std.go @@ -0,0 +1,208 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount" + "github.com/chrislusf/seaweedfs/weed/mount/unmount" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/hanwen/go-fuse/v2/fuse" + "net/http" + "os" + "os/user" + "runtime" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" +) + +func runMount2(cmd *Command, args []string) bool { + + if *mountOptions.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil) + } + + grace.SetupProfiling(*mountCpuProfile, *mountMemProfile) + if *mountReadRetryTime < time.Second { + *mountReadRetryTime = time.Second + } + util.RetryWaitTime = *mountReadRetryTime + + umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64) + if umaskErr != nil { + fmt.Printf("can not parse umask %s", *mountOptions.umaskString) + return false + } + + if len(args) > 0 { + return false + } + + return RunMount2(&mount2Options, os.FileMode(umask)) +} + +func RunMount2(option *Mount2Options, umask os.FileMode) bool { + + // basic checks + chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB + if chunkSizeLimitMB <= 0 { + fmt.Printf("Please specify a reasonable buffer size.") + return false + } + + // try to connect to filer + filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses() + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + var cipher bool + var err error + for i := 0; i < 10; i++ { + err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) + } + cipher = resp.Cipher + return nil + }) + if err != nil { + glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err) + glog.V(0).Infof("wait for %d seconds ...", i+1) + time.Sleep(time.Duration(i+1) * time.Second) + } + } + if err != nil { + glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err) + return true + } + + filerMountRootPath := *option.filerMountRootPath + + // clean up mount point + dir := util.ResolvePath(*option.dir) + if dir == "" { + fmt.Printf("Please specify the mount directory via \"-dir\"") + return false + } + + unmount.Unmount(dir) + + // detect mount folder mode + if *option.dirAutoCreate { + os.MkdirAll(dir, os.FileMode(0777)&^umask) + } + fileInfo, err := os.Stat(dir) + + // collect uid, gid + uid, gid := uint32(0), uint32(0) + mountMode := os.ModeDir | 0777 + if err == nil { + mountMode = os.ModeDir | os.FileMode(0777)&^umask + uid, gid = util.GetFileUidGid(fileInfo) + fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode) + } else { + fmt.Printf("can not stat %s\n", dir) + return false + } + + // detect uid, gid + if uid == 0 { + if u, err := user.Current(); err == nil { + if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { + uid = uint32(parsedId) + } + if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil { + gid = uint32(parsedId) + } + fmt.Printf("current uid=%d gid=%d\n", uid, gid) + } + } + + // mapping uid, gid + uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap) + if err != nil { + fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err) + return false + } + + // Ensure target mount point availability + if isValid := checkMountPointAvailable(dir); !isValid { + glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir) + return true + } + + // mount fuse + fuseMountOptions := &fuse.MountOptions{ + AllowOther: *option.allowOthers, + Options: nil, + MaxBackground: 128, + MaxWrite: 1024 * 1024 * 2, + MaxReadAhead: 1024 * 1024 * 2, + IgnoreSecurityLabels: false, + RememberInodes: false, + FsName: *option.filer + ":" + filerMountRootPath, + Name: "seaweedfs", + SingleThreaded: false, + DisableXAttrs: false, + Debug: true, + EnableLocks: false, + ExplicitDataCacheControl: false, + // SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability + DirectMount: true, + DirectMountFlags: 0, + // EnableAcl: false, + } + + // find mount point + mountRoot := filerMountRootPath + if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") { + mountRoot = mountRoot[0 : len(mountRoot)-1] + } + + seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{ + MountDirectory: dir, + FilerAddresses: filerAddresses, + GrpcDialOption: grpcDialOption, + FilerMountRootPath: mountRoot, + Collection: *option.collection, + Replication: *option.replication, + TtlSec: int32(*option.ttlSec), + DiskType: types.ToDiskType(*option.diskType), + ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + ConcurrentWriters: *option.concurrentWriters, + CacheDir: *option.cacheDir, + CacheSizeMB: *option.cacheSizeMB, + DataCenter: *option.dataCenter, + MountUid: uid, + MountGid: gid, + MountMode: mountMode, + MountCtime: fileInfo.ModTime(), + MountMtime: time.Now(), + Umask: umask, + VolumeServerAccess: *mountOptions.volumeServerAccess, + Cipher: cipher, + UidGidMapper: uidGidMapper, + }) + + server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions) + if err != nil { + glog.Fatalf("Mount fail: %v", err) + } + grace.OnInterrupt(func() { + unmount.Unmount(dir) + }) + + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) + + server.Serve() + + return true +} diff --git a/weed/mount/directory.go b/weed/mount/directory.go new file mode 100644 index 000000000..60fbafc37 --- /dev/null +++ b/weed/mount/directory.go @@ -0,0 +1,42 @@ +package mount + +import ( + "bytes" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/hanwen/go-fuse/v2/fs" + "strings" +) + +type Directory struct { + fs.Inode + + name string + wfs *WFS + entry *filer_pb.Entry + parent *Directory + id uint64 +} + +func (dir *Directory) FullPath() string { + var parts []string + for p := dir; p != nil; p = p.parent { + if strings.HasPrefix(p.name, "/") { + if len(p.name) > 1 { + parts = append(parts, p.name[1:]) + } + } else { + parts = append(parts, p.name) + } + } + + if len(parts) == 0 { + return "/" + } + + var buf bytes.Buffer + for i := len(parts) - 1; i >= 0; i-- { + buf.WriteString("/") + buf.WriteString(parts[i]) + } + return buf.String() +} diff --git a/weed/mount/directory_read.go b/weed/mount/directory_read.go new file mode 100644 index 000000000..51c51ae16 --- /dev/null +++ b/weed/mount/directory_read.go @@ -0,0 +1,84 @@ +package mount + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/hanwen/go-fuse/v2/fs" + "github.com/hanwen/go-fuse/v2/fuse" + "math" + "os" + "syscall" +) + +var _ = fs.NodeReaddirer(&Directory{}) +var _ = fs.NodeGetattrer(&Directory{}) + +func (dir *Directory) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno { + out.Mode = 0755 + return 0 +} + +func (dir *Directory) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) { + + dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("Readdir %s", dirPath) + + sourceChan := make(chan fuse.DirEntry, 64) + + stream := newDirectoryListStream(sourceChan) + + processEachEntryFn := func(entry *filer.Entry, isLast bool) { + sourceChan <- fuse.DirEntry{ + Mode: uint32(entry.Mode), + Name: entry.Name(), + Ino: dirPath.Child(entry.Name()).AsInode(os.ModeDir), + } + } + + if err := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return nil, fs.ToErrno(os.ErrInvalid) + } + go func() { + dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + processEachEntryFn(entry, false) + return true + }) + close(sourceChan) + }() + + return stream, fs.OK +} + +var _ = fs.DirStream(&DirectoryListStream{}) + +type DirectoryListStream struct { + next fuse.DirEntry + sourceChan chan fuse.DirEntry + isStarted bool + hasNext bool +} + +func newDirectoryListStream(ch chan fuse.DirEntry) *DirectoryListStream { + return &DirectoryListStream{ + sourceChan: ch, + } +} + +func (i *DirectoryListStream) HasNext() bool { + if !i.isStarted { + i.next, i.hasNext = <-i.sourceChan + i.isStarted = true + } + return i.hasNext +} +func (i *DirectoryListStream) Next() (fuse.DirEntry, syscall.Errno) { + t := i.next + i.next, i.hasNext = <-i.sourceChan + return t, fs.OK +} +func (i *DirectoryListStream) Close() { +} diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go new file mode 100644 index 000000000..04366ab0d --- /dev/null +++ b/weed/mount/inode_to_path.go @@ -0,0 +1,61 @@ +package mount + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" +) + +type InodeToPath struct { + sync.RWMutex + nextInodeId uint64 + inode2path map[uint64]util.FullPath + path2inode map[util.FullPath]uint64 +} + +func NewInodeToPath() *InodeToPath { + return &InodeToPath{ + inode2path: make(map[uint64]util.FullPath), + path2inode: make(map[util.FullPath]uint64), + nextInodeId: 2, // the root inode id is 1 + } +} + +func (i *InodeToPath) GetInode(path util.FullPath) uint64 { + if path == "/" { + return 1 + } + i.Lock() + defer i.Unlock() + inode, found := i.path2inode[path] + if !found { + inode = i.nextInodeId + i.nextInodeId++ + i.path2inode[path] = inode + i.inode2path[inode] = path + } + return inode +} + +func (i *InodeToPath) GetPath(inode uint64) util.FullPath { + if inode == 1 { + return "/" + } + i.RLock() + defer i.RUnlock() + path, found := i.inode2path[inode] + if !found { + glog.Fatal("not found inode %d", inode) + } + return path +} + +func (i *InodeToPath) HasPath(path util.FullPath) bool { + if path == "/" { + return true + } + i.RLock() + defer i.RUnlock() + _, found := i.path2inode[path] + return found +} diff --git a/weed/mount/unmount/unmount.go b/weed/mount/unmount/unmount.go new file mode 100644 index 000000000..c481d8030 --- /dev/null +++ b/weed/mount/unmount/unmount.go @@ -0,0 +1,6 @@ +package unmount + +// Unmount tries to unmount the filesystem mounted at dir. +func Unmount(dir string) error { + return unmount(dir) +} diff --git a/weed/mount/unmount/unmount_linux.go b/weed/mount/unmount/unmount_linux.go new file mode 100644 index 000000000..e55d48f86 --- /dev/null +++ b/weed/mount/unmount/unmount_linux.go @@ -0,0 +1,21 @@ +package unmount + +import ( + "bytes" + "errors" + "os/exec" +) + +func unmount(dir string) error { + cmd := exec.Command("fusermount", "-u", dir) + output, err := cmd.CombinedOutput() + if err != nil { + if len(output) > 0 { + output = bytes.TrimRight(output, "\n") + msg := err.Error() + ": " + string(output) + err = errors.New(msg) + } + return err + } + return nil +} diff --git a/weed/mount/unmount/unmount_std.go b/weed/mount/unmount/unmount_std.go new file mode 100644 index 000000000..76267fb6a --- /dev/null +++ b/weed/mount/unmount/unmount_std.go @@ -0,0 +1,18 @@ +//go:build !linux +// +build !linux + +package unmount + +import ( + "os" + "syscall" +) + +func unmount(dir string) error { + err := syscall.Unmount(dir, 0) + if err != nil { + err = &os.PathError{Op: "unmount", Path: dir, Err: err} + return err + } + return nil +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go new file mode 100644 index 000000000..fa5c7d4f9 --- /dev/null +++ b/weed/mount/weedfs.go @@ -0,0 +1,144 @@ +package mount + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/hanwen/go-fuse/v2/fuse" + "google.golang.org/grpc" + "os" + "path" + "path/filepath" + "time" + + "github.com/hanwen/go-fuse/v2/fs" +) + +type Option struct { + MountDirectory string + FilerAddresses []pb.ServerAddress + filerIndex int + GrpcDialOption grpc.DialOption + FilerMountRootPath string + Collection string + Replication string + TtlSec int32 + DiskType types.DiskType + ChunkSizeLimit int64 + ConcurrentWriters int + CacheDir string + CacheSizeMB int64 + DataCenter string + Umask os.FileMode + + MountUid uint32 + MountGid uint32 + MountMode os.FileMode + MountCtime time.Time + MountMtime time.Time + MountParentInode uint64 + + VolumeServerAccess string // how to access volume servers + Cipher bool // whether encrypt data on volume server + UidGidMapper *meta_cache.UidGidMapper + + uniqueCacheDir string + uniqueCacheTempPageDir string +} + +type WFS struct { + // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go + fuse.RawFileSystem + fs.Inode + option *Option + metaCache *meta_cache.MetaCache + stats statsCache + root Directory + signature int32 + inodeToPath *InodeToPath +} + +func NewSeaweedFileSystem(option *Option) *WFS { + wfs := &WFS{ + RawFileSystem: fuse.NewDefaultRawFileSystem(), + option: option, + signature: util.RandomInt32(), + inodeToPath: NewInodeToPath(), + } + + wfs.root = Directory{ + name: "/", + wfs: wfs, + entry: nil, + parent: nil, + } + + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) { + }) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) + + return wfs +} + +func (wfs *WFS) Root() *Directory { + return &wfs.root +} + +func (wfs *WFS) String() string { + return "seaweedfs" +} + +func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, entry *filer_pb.Entry, status fuse.Status) { + path = wfs.inodeToPath.GetPath(inode) + entry, status = wfs.maybeLoadEntry(path) + return +} + +func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) { + + // glog.V(3).Infof("read entry cache miss %s", fullpath) + dir, name := fullpath.DirAndName() + + // return a valid entry for the mount root + if string(fullpath) == wfs.option.FilerMountRootPath { + return &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: wfs.option.MountMtime.Unix(), + FileMode: uint32(wfs.option.MountMode), + Uid: wfs.option.MountUid, + Gid: wfs.option.MountGid, + Crtime: wfs.option.MountCtime.Unix(), + }, + }, fuse.OK + } + + // read from async meta cache + meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + return cachedEntry.ToProtoEntry(), fuse.ENOSYS +} + +func (option *Option) setupUniqueCacheDirectory() { + cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8] + option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) + option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw") + os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask) +} + +func (option *Option) getTempFilePageDir() string { + return option.uniqueCacheTempPageDir +} +func (option *Option) getUniqueCacheDir() string { + return option.uniqueCacheDir +} diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go new file mode 100644 index 000000000..788badb77 --- /dev/null +++ b/weed/mount/weedfs_attr.go @@ -0,0 +1,125 @@ +package mount + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/hanwen/go-fuse/v2/fuse" + "os" + "syscall" + "time" +) + +func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse.AttrOut) (code fuse.Status) { + println("input node id", input.NodeId) + if input.NodeId == 1 { + wfs.setRootAttr(out) + return fuse.OK + } + + _, entry, status := wfs.maybeReadEntry(input.NodeId) + if status != fuse.OK { + return status + } + out.AttrValid = 1 + wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry) + + return fuse.OK +} + +func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse.AttrOut) (code fuse.Status) { + return fuse.ENOSYS +} +func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string, dest []byte) (size uint32, code fuse.Status) { + return 0, fuse.ENOSYS +} + +func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr string, data []byte) fuse.Status { + return fuse.ENOSYS +} + +func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) { + return 0, fuse.ENOSYS +} + +func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string) fuse.Status { + return fuse.ENOSYS +} + +func (wfs *WFS) setRootAttr(out *fuse.AttrOut) { + now := uint64(time.Now().Second()) + out.AttrValid = 119 + out.Ino = 1 + setBlksize(&out.Attr, blockSize) + out.Uid = wfs.option.MountUid + out.Gid = wfs.option.MountGid + out.Mtime = now + out.Ctime = now + out.Atime = now + out.Mode = toSystemType(os.ModeDir) | uint32(wfs.option.MountMode) + out.Nlink = 1 +} + +func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) { + out.Ino = inode + out.Uid = entry.Attributes.Uid + out.Gid = entry.Attributes.Gid + out.Mode = toSystemMode(os.FileMode(entry.Attributes.FileMode)) + out.Mtime = uint64(entry.Attributes.Mtime) + out.Ctime = uint64(entry.Attributes.Mtime) + out.Atime = uint64(entry.Attributes.Mtime) + if entry.HardLinkCounter > 0 { + out.Nlink = uint32(entry.HardLinkCounter) + } + out.Size = filer.FileSize(entry) + out.Blocks = out.Size/blockSize + 1 + setBlksize(out, blockSize) + out.Nlink = 1 +} + +func (wfs *WFS) setAttrByFilerEntry(out *fuse.Attr, inode uint64, entry *filer.Entry) { + out.Ino = inode + out.Size = entry.FileSize + out.Blocks = out.Size/blockSize + 1 + setBlksize(out, blockSize) + out.Atime = uint64(entry.Attr.Mtime.Unix()) + out.Mtime = uint64(entry.Attr.Mtime.Unix()) + out.Ctime = uint64(entry.Attr.Mtime.Unix()) + out.Crtime_ = uint64(entry.Attr.Crtime.Unix()) + out.Mode = toSystemMode(entry.Attr.Mode) + if entry.HardLinkCounter > 0 { + out.Nlink = uint32(entry.HardLinkCounter) + } + out.Nlink = 1 + out.Uid = entry.Attr.Uid + out.Gid = entry.Attr.Gid +} + +func (wfs *WFS) outputEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) { + // out.Generation = 1 + out.EntryValid = 1 + out.AttrValid = 1 + wfs.setAttrByFilerEntry(&out.Attr, inode, entry) +} + +func toSystemMode(mode os.FileMode) uint32 { + return toSystemType(mode) | uint32(mode) +} + +func toSystemType(mode os.FileMode) uint32 { + switch mode & os.ModeType { + case os.ModeDir: + return syscall.S_IFDIR + case os.ModeSymlink: + return syscall.S_IFLNK + case os.ModeNamedPipe: + return syscall.S_IFIFO + case os.ModeSocket: + return syscall.S_IFSOCK + case os.ModeDevice: + return syscall.S_IFBLK + case os.ModeCharDevice: + return syscall.S_IFCHR + default: + return syscall.S_IFREG + } +} diff --git a/weed/mount/weedfs_attr_darwin.go b/weed/mount/weedfs_attr_darwin.go new file mode 100644 index 000000000..e7767d4a6 --- /dev/null +++ b/weed/mount/weedfs_attr_darwin.go @@ -0,0 +1,8 @@ +package mount + +import ( + "github.com/hanwen/go-fuse/v2/fuse" +) + +func setBlksize(out *fuse.Attr, size uint32) { +} diff --git a/weed/mount/weedfs_attr_linux.go b/weed/mount/weedfs_attr_linux.go new file mode 100644 index 000000000..56be62e62 --- /dev/null +++ b/weed/mount/weedfs_attr_linux.go @@ -0,0 +1,9 @@ +package mount + +import ( + "github.com/hanwen/go-fuse/v2/fuse" +) + +func setBlksize(out *fuse.Attr, size uint32) { + out.Blksize = size +} diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go new file mode 100644 index 000000000..672ba9711 --- /dev/null +++ b/weed/mount/weedfs_dir_lookup.go @@ -0,0 +1,59 @@ +package mount + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/hanwen/go-fuse/v2/fuse" +) + +// Lookup is called by the kernel when the VFS wants to know +// about a file inside a directory. Many lookup calls can +// occur in parallel, but only one call happens for each (dir, +// name) pair. + +func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name string, out *fuse.EntryOut) (code fuse.Status) { + + dirPath := wfs.inodeToPath.GetPath(header.NodeId) + + println("lookup", name, "dir inode", header.NodeId) + + fullFilePath := dirPath.Child(name) + + visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath) + if visitErr != nil { + glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) + return fuse.EIO + } + localEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return fuse.ENOENT + } + + if localEntry == nil { + // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) + entry, err := filer_pb.GetEntry(wfs, fullFilePath) + if err != nil { + glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) + return fuse.ENOENT + } + localEntry = filer.FromPbEntry(string(dirPath), entry) + } else { + glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) + } + + if localEntry == nil { + return fuse.ENOENT + } + + inode := wfs.inodeToPath.GetInode(fullFilePath) + + println("found", name, "inode", inode) + + wfs.outputEntry(out, inode, localEntry) + + return fuse.OK + +} diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go new file mode 100644 index 000000000..a696953a1 --- /dev/null +++ b/weed/mount/weedfs_dir_read.go @@ -0,0 +1,93 @@ +package mount + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/hanwen/go-fuse/v2/fuse" + "math" + "os" +) + +// Directory handling + +func (wfs *WFS) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut) (code fuse.Status) { + return fuse.OK +} +func (wfs *WFS) ReleaseDir(input *fuse.ReleaseIn) { +} +func (wfs *WFS) FsyncDir(cancel <-chan struct{}, input *fuse.FsyncIn) (code fuse.Status) { + return fuse.OK +} + +func (wfs *WFS) ReadDir(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) { + return wfs.doReadDirectory(input, out, false) +} + +func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) { + return wfs.doReadDirectory(input, out, true) +} + +func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status { + dirPath := wfs.inodeToPath.GetPath(input.NodeId) + + println("input size", input.Size, "offset", input.Offset, "pid", input.Caller.Pid) + + var dirEntry fuse.DirEntry + if input.Offset == 0 { + dirEntry.Ino = input.NodeId + dirEntry.Name = "." + dirEntry.Mode = toSystemMode(os.ModeDir) + out.AddDirEntry(dirEntry) + + parentDir, _ := dirPath.DirAndName() + parentInode := wfs.inodeToPath.GetInode(util.FullPath(parentDir)) + dirEntry.Ino = parentInode + dirEntry.Name = ".." + dirEntry.Mode = toSystemMode(os.ModeDir) + out.AddDirEntry(dirEntry) + + } + + var counter uint64 + processEachEntryFn := func(entry *filer.Entry, isLast bool) bool { + counter++ + if counter <= input.Offset { + return true + } + dirEntry.Name = entry.Name() + inode := wfs.inodeToPath.GetInode(dirPath.Child(dirEntry.Name)) + println("entry", dirEntry.Name, "inode", inode) + dirEntry.Ino = inode + dirEntry.Mode = toSystemMode(entry.Mode) + if !isPlusMode { + if !out.AddDirEntry(dirEntry) { + return false + } + } else { + entryOut := out.AddDirLookupEntry(dirEntry) + if entryOut == nil { + return false + } + wfs.outputEntry(entryOut, inode, entry) + } + return true + } + + // TODO remove this with checking whether directory is not forgotten + if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return fuse.EIO + } + listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + return processEachEntryFn(entry, false) + }) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return fuse.EIO + } + + return fuse.OK +} diff --git a/weed/mount/weedfs_stats.go b/weed/mount/weedfs_stats.go new file mode 100644 index 000000000..3de561082 --- /dev/null +++ b/weed/mount/weedfs_stats.go @@ -0,0 +1,80 @@ +package mount + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/hanwen/go-fuse/v2/fuse" + "math" + "time" +) + +const blockSize = 512 + +type statsCache struct { + filer_pb.StatisticsResponse + lastChecked int64 // unix time in seconds +} + +func (wfs *WFS) StatFs(cancel <-chan struct{}, in *fuse.InHeader, out *fuse.StatfsOut) (code fuse.Status) { + + glog.V(4).Infof("reading fs stats") + + if wfs.stats.lastChecked < time.Now().Unix()-20 { + + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.StatisticsRequest{ + Collection: wfs.option.Collection, + Replication: wfs.option.Replication, + Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), + DiskType: string(wfs.option.DiskType), + } + + glog.V(4).Infof("reading filer stats: %+v", request) + resp, err := client.Statistics(context.Background(), request) + if err != nil { + glog.V(0).Infof("reading filer stats %v: %v", request, err) + return err + } + glog.V(4).Infof("read filer stats: %+v", resp) + + wfs.stats.TotalSize = resp.TotalSize + wfs.stats.UsedSize = resp.UsedSize + wfs.stats.FileCount = resp.FileCount + wfs.stats.lastChecked = time.Now().Unix() + + return nil + }) + if err != nil { + glog.V(0).Infof("filer Statistics: %v", err) + return fuse.OK + } + } + + totalDiskSize := wfs.stats.TotalSize + usedDiskSize := wfs.stats.UsedSize + actualFileCount := wfs.stats.FileCount + + // Compute the total number of available blocks + out.Blocks = totalDiskSize / blockSize + + // Compute the number of used blocks + numBlocks := uint64(usedDiskSize / blockSize) + + // Report the number of free and available blocks for the block size + out.Bfree = out.Blocks - numBlocks + out.Bavail = out.Blocks - numBlocks + out.Bsize = uint32(blockSize) + + // Report the total number of possible files in the file system (and those free) + out.Files = math.MaxInt64 + out.Ffree = math.MaxInt64 - actualFileCount + + // Report the maximum length of a name and the minimum fragment size + out.NameLen = 1024 + out.Frsize = uint32(blockSize) + + return fuse.OK +} diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go new file mode 100644 index 000000000..e8feb8342 --- /dev/null +++ b/weed/mount/wfs_filer_client.go @@ -0,0 +1,51 @@ +package mount + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var _ = filer_pb.FilerClient(&WFS{}) + +func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { + + return util.Retry("filer grpc", func() error { + + i := wfs.option.filerIndex + n := len(wfs.option.FilerAddresses) + for x := 0; x < n; x++ { + + filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress() + err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, wfs.option.GrpcDialOption) + + if err != nil { + glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) + } else { + wfs.option.filerIndex = i + return nil + } + + i++ + if i >= n { + i = 0 + } + + } + return err + }) + +} + +func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { + if wfs.option.VolumeServerAccess == "publicUrl" { + return location.PublicUrl + } + return location.Url +} |
