diff options
Diffstat (limited to 'weed/command/mount_std.go')
| -rw-r--r-- | weed/command/mount_std.go | 178 |
1 files changed, 116 insertions, 62 deletions
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 453531d00..2474cf7dd 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -3,7 +3,9 @@ package command import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "os/user" "path" @@ -12,20 +14,27 @@ import ( "strings" "time" - "github.com/jacobsa/daemonize" - "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/util/grace" ) func runMount(cmd *Command, args []string) bool { - util.SetupProfiling(*mountCpuProfile, *mountMemProfile) + grace.SetupProfiling(*mountCpuProfile, *mountMemProfile) + if *mountReadRetryTime < time.Second { + *mountReadRetryTime = time.Second + } + util.RetryWaitTime = *mountReadRetryTime umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64) if umaskErr != nil { @@ -33,27 +42,52 @@ func runMount(cmd *Command, args []string) bool { return false } - return RunMount( - *mountOptions.filer, - *mountOptions.filerMountRootPath, - *mountOptions.dir, - *mountOptions.collection, - *mountOptions.replication, - *mountOptions.dataCenter, - *mountOptions.chunkSizeLimitMB, - *mountOptions.allowOthers, - *mountOptions.ttlSec, - *mountOptions.dirListCacheLimit, - os.FileMode(umask), - ) + if len(args) > 0 { + return false + } + + return RunMount(&mountOptions, os.FileMode(umask)) } -func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, - allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool { +func RunMount(option *MountOptions, umask os.FileMode) bool { + + filer := *option.filer + // parse filer grpc address + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer) + if err != nil { + glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) + return true + } util.LoadConfiguration("security", false) + // try to connect to filer, filerBucketsPath may be useful later + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + var cipher bool + for i := 0; i < 10; i++ { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + } + cipher = resp.Cipher + return nil + }) + if err != nil { + glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.V(0).Infof("wait for %d seconds ...", i+1) + time.Sleep(time.Duration(i+1) * time.Second) + } + } + if err != nil { + glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err) + return true + } + + filerMountRootPath := *option.filerMountRootPath + dir := util.ResolvePath(*option.dir) + chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") return false @@ -65,15 +99,21 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente fuse.Unmount(dir) - uid, gid := uint32(0), uint32(0) - // detect mount folder mode - mountMode := os.ModeDir | 0755 + if *option.dirAutoCreate { + os.MkdirAll(dir, os.FileMode(0777)&^umask) + } fileInfo, err := os.Stat(dir) + + uid, gid := uint32(0), uint32(0) + mountMode := os.ModeDir | 0777 if err == nil { - mountMode = os.ModeDir | fileInfo.Mode() + mountMode = os.ModeDir | os.FileMode(0777)&^umask uid, gid = util.GetFileUidGid(fileInfo) - fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode()) + fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode) + } else { + fmt.Printf("can not stat %s\n", dir) + return false } if uid == 0 { @@ -88,10 +128,17 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente } } + // mapping uid, gid + uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap) + if err != nil { + fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err) + return false + } + // Ensure target mount point availability if isValid := checkMountPointAvailable(dir); !isValid { glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir) - return false + return true } mountName := path.Base(dir) @@ -100,10 +147,8 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente fuse.VolumeName(mountName), fuse.FSName(filer + ":" + filerMountRootPath), fuse.Subtype("seaweedfs"), - fuse.NoAppleDouble(), + // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders fuse.NoAppleXattr(), - fuse.NoBrowse(), - fuse.AutoXattr(), fuse.ExclCreate(), fuse.DaemonTimeout("3600"), fuse.AllowSUID(), @@ -111,68 +156,77 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente fuse.MaxReadahead(1024 * 128), fuse.AsyncRead(), fuse.WritebackCache(), - fuse.AllowNonEmptyMount(), + fuse.MaxBackground(128), + fuse.CongestionThreshold(128), } options = append(options, osSpecificMountOptions()...) - - if allowOthers { + if *option.allowOthers { options = append(options, fuse.AllowOther()) } - - c, err := fuse.Mount(dir, options...) - if err != nil { - glog.V(0).Infof("mount: %v", err) - daemonize.SignalOutcome(err) - return true + if *option.nonempty { + options = append(options, fuse.AllowNonEmptyMount()) } - - util.OnInterrupt(func() { - fuse.Unmount(dir) - c.Close() - }) - - filerGrpcAddress, err := parseFilerGrpcAddress(filer) - if err != nil { - glog.V(0).Infof("parseFilerGrpcAddress: %v", err) - daemonize.SignalOutcome(err) - return true + if *option.readOnly { + options = append(options, fuse.ReadOnly()) } + // find mount point mountRoot := filerMountRootPath if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") { mountRoot = mountRoot[0 : len(mountRoot)-1] } - daemonize.SignalOutcome(nil) + diskType := types.ToDiskType(*option.diskType) - err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{ + seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ + MountDirectory: dir, + FilerAddress: filer, FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: grpcDialOption, FilerMountRootPath: mountRoot, - Collection: collection, - Replication: replication, - TtlSec: int32(ttlSec), + Collection: *option.collection, + Replication: *option.replication, + TtlSec: int32(*option.ttlSec), + DiskType: diskType, ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, - DataCenter: dataCenter, - DirListCacheLimit: dirListCacheLimit, - EntryCacheTtl: 3 * time.Second, + ConcurrentWriters: *option.concurrentWriters, + CacheDir: *option.cacheDir, + CacheSizeMB: *option.cacheSizeMB, + DataCenter: *option.dataCenter, MountUid: uid, MountGid: gid, MountMode: mountMode, MountCtime: fileInfo.ModTime(), MountMtime: time.Now(), Umask: umask, - })) + VolumeServerAccess: *mountOptions.volumeServerAccess, + Cipher: cipher, + UidGidMapper: uidGidMapper, + }) + + // mount + c, err := fuse.Mount(dir, options...) if err != nil { - fuse.Unmount(dir) + glog.V(0).Infof("mount: %v", err) + return true } + defer fuse.Unmount(dir) + + grace.OnInterrupt(func() { + fuse.Unmount(dir) + c.Close() + }) + + glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir) + server := fs.New(c, nil) + seaweedFileSystem.Server = server + err = server.Serve(seaweedFileSystem) // check if the mount process has an error to report <-c.Ready if err := c.MountError; err != nil { glog.V(0).Infof("mount process: %v", err) - daemonize.SignalOutcome(err) return true } |
