diff options
| author | binbinshi <javabinbin@126.com> | 2020-02-05 16:56:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-02-05 16:56:23 +0800 |
| commit | d892cad15d748327c2b7c649f6398ff35d8dce0b (patch) | |
| tree | 29cb8adae01d9f4eaeabb02996d162700da2de1a /weed/command | |
| parent | d4f755347e4874cf0a2fd13480580f348b86a465 (diff) | |
| parent | 8d94564f4152cd890d5896a3dedf5e7589c5023e (diff) | |
| download | seaweedfs-d892cad15d748327c2b7c649f6398ff35d8dce0b.tar.xz seaweedfs-d892cad15d748327c2b7c649f6398ff35d8dce0b.zip | |
Merge pull request #1 from chrislusf/master
update from chrisluf
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/backup.go | 14 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 3 | ||||
| -rw-r--r-- | weed/command/compact.go | 5 | ||||
| -rw-r--r-- | weed/command/export.go | 22 | ||||
| -rw-r--r-- | weed/command/filer.go | 7 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 14 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 17 | ||||
| -rw-r--r-- | weed/command/fix.go | 30 | ||||
| -rw-r--r-- | weed/command/master.go | 13 | ||||
| -rw-r--r-- | weed/command/mount.go | 8 | ||||
| -rw-r--r-- | weed/command/mount_darwin.go | 4 | ||||
| -rw-r--r-- | weed/command/mount_freebsd.go | 4 | ||||
| -rw-r--r-- | weed/command/mount_linux.go | 146 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 32 | ||||
| -rw-r--r-- | weed/command/s3.go | 7 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 62 | ||||
| -rw-r--r-- | weed/command/scaffold_test.go | 44 | ||||
| -rw-r--r-- | weed/command/server.go | 1 | ||||
| -rw-r--r-- | weed/command/shell.go | 27 | ||||
| -rw-r--r-- | weed/command/upload.go | 6 | ||||
| -rw-r--r-- | weed/command/volume.go | 157 | ||||
| -rw-r--r-- | weed/command/webdav.go | 3 |
22 files changed, 464 insertions, 162 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index 505de4ae6..eb2b5ba4a 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -5,8 +5,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" @@ -64,7 +64,7 @@ var cmdBackup = &Command{ func runBackup(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") if *s.volumeId == -1 { return false @@ -98,15 +98,15 @@ func runBackup(cmd *Command, args []string) bool { return true } } - var replication *storage.ReplicaPlacement + var replication *super_block.ReplicaPlacement if *s.replication != "" { - replication, err = storage.NewReplicaPlacementFromString(*s.replication) + replication, err = super_block.NewReplicaPlacementFromString(*s.replication) if err != nil { fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err) return true } } else { - replication, err = storage.NewReplicaPlacementFromString(stats.Replication) + replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) if err != nil { fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) return true @@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact(0, 0); err != nil { + if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } @@ -128,7 +128,7 @@ func runBackup(cmd *Command, args []string) bool { return true } v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) - v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0) + v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) } datSize, _, _ := v.FileStat() diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 26be1fe3a..382e7c850 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -15,7 +15,6 @@ import ( "sync" "time" - "github.com/spf13/viper" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -109,7 +108,7 @@ var ( func runBenchmark(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) if *b.maxCpu < 1 { diff --git a/weed/command/compact.go b/weed/command/compact.go index 4a54f5670..85313b749 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -17,6 +17,9 @@ var cmdCompact = &Command{ The compacted .dat file is stored as .cpd file. The compacted .idx file is stored as .cpx file. + For method=0, it compacts based on the .dat file, works if .idx file is corrupted. + For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files. + `, } @@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { - if err = v.Compact2(); err != nil { + if err = v.Compact2(preallocate); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } diff --git a/weed/command/export.go b/weed/command/export.go index d3a765e09..8d664ad3b 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -4,6 +4,7 @@ import ( "archive/tar" "bytes" "fmt" + "io" "os" "path" "path/filepath" @@ -12,11 +13,11 @@ import ( "text/template" "time" - "io" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -89,12 +90,12 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, type VolumeFileScanner4Export struct { version needle.Version counter int - needleMap *storage.NeedleMap + needleMap *needle_map.MemDb vid needle.VolumeId } -func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version return nil } @@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool { fileName = *export.collection + "_" + fileName } vid := needle.VolumeId(*export.volumeId) - indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - needleMap, err := storage.LoadBtreeNeedleMap(indexFile) - if err != nil { - glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) + needleMap := needle_map.NewMemDb() + if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil { + glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err) } volumeFileScanner := &VolumeFileScanner4Export{ diff --git a/weed/command/filer.go b/weed/command/filer.go index b1ceb46f5..ea8392fac 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -6,14 +6,13 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" + "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc/reflection" ) var ( @@ -145,7 +144,7 @@ func (fo *FilerOptions) startFiler() { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "filer")) + grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 9995cf6aa..e5979d786 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -14,13 +14,13 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/spf13/viper" - "google.golang.org/grpc" ) var ( @@ -105,7 +105,7 @@ func runCopy(cmd *Command, args []string) bool { filerGrpcPort := filerPort + 10000 filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) - copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") ctx := context.Background() @@ -331,7 +331,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy }, } - if _, err := client.CreateEntry(ctx, request); err != nil { + if err := filer_pb.CreateEntry(ctx, client, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil @@ -378,7 +378,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), io.NewSectionReader(f, i*chunkSize, chunkSize), - false, "application/octet-stream", nil, assignResult.Auth) + false, "", nil, assignResult.Auth) if err != nil { uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) return @@ -435,7 +435,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC }, } - if _, err := client.CreateEntry(ctx, request); err != nil { + if err := filer_pb.CreateEntry(ctx, client, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil @@ -466,7 +466,7 @@ func detectMimeType(f *os.File) string { func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(clientConn) return fn(client) }, filerAddress, grpcDialOption) diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index c6e7f5dba..737f0d24a 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -39,7 +39,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("replication", true) util.LoadConfiguration("notification", true) - config := viper.GetViper() + config := util.GetViper() var notificationInput sub.NotificationInput @@ -47,8 +47,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { for _, input := range sub.NotificationInputs { if config.GetBool("notification." + input.GetName() + ".enabled") { - viperSub := config.Sub("notification." + input.GetName()) - if err := input.Initialize(viperSub); err != nil { + if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize notification input for %s: %+v", input.GetName(), err) } @@ -66,10 +65,9 @@ func runFilerReplicate(cmd *Command, args []string) bool { // avoid recursive replication if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") { - sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer") - if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { - fromDir := sourceConfig.GetString("directory") - toDir := sinkConfig.GetString("directory") + if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") { + fromDir := config.GetString("source.filer.directory") + toDir := config.GetString("sink.filer.directory") if strings.HasPrefix(toDir, fromDir) { glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) } @@ -79,8 +77,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { var dataSink sink.ReplicationSink for _, sk := range sink.Sinks { if config.GetBool("sink." + sk.GetName() + ".enabled") { - viperSub := config.Sub("sink." + sk.GetName()) - if err := sk.Initialize(viperSub); err != nil { + if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize sink for %s: %+v", sk.GetName(), err) } @@ -98,7 +95,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { return true } - replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink) + replicator := replication.NewReplicator(config, "source.filer.", dataSink) for { key, m, err := notificationInput.ReceiveMessage() diff --git a/weed/command/fix.go b/weed/command/fix.go index 2fbbca5e6..8903595fa 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -8,6 +8,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -31,11 +33,11 @@ var ( type VolumeFileScanner4Fix struct { version needle.Version - nm *storage.NeedleMap + nm *needle_map.MemDb } -func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock storage.SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version return nil } @@ -46,11 +48,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) if n.Size > 0 && n.Size != types.TombstoneFileSize { - pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) + pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size) glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id, types.ToOffset(offset)) + return scanner.nm.Delete(n.Id) } return nil } @@ -66,23 +68,21 @@ func runFix(cmd *Command, args []string) bool { baseFileName = *fixVolumeCollection + "_" + baseFileName } indexFileName := path.Join(*fixVolumePath, baseFileName+".idx") - indexFile, err := os.OpenFile(indexFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - nm := storage.NewBtreeNeedleMap(indexFile) - defer nm.Close() + nm := needle_map.NewMemDb() vid := needle.VolumeId(*fixVolumeId) scanner := &VolumeFileScanner4Fix{ nm: nm, } - err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner) - if err != nil { - glog.Fatalf("Export Volume File [ERROR] %s\n", err) + if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil { + glog.Fatalf("scan .dat File: %v", err) + os.Remove(indexFileName) + } + + if err := nm.SaveToIdx(indexFileName); err != nil { + glog.Fatalf("save to .idx File: %v", err) os.Remove(indexFileName) } diff --git a/weed/command/master.go b/weed/command/master.go index 3d33f4f7a..c4b11119b 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -8,14 +8,15 @@ import ( "strings" "github.com/chrislusf/raft/protobuf" + "github.com/gorilla/mux" + "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" - "github.com/spf13/viper" - "google.golang.org/grpc/reflection" ) var ( @@ -101,6 +102,8 @@ func runMaster(cmd *Command, args []string) bool { func startMaster(masterOption MasterOptions, masterWhiteList []string) { + backend.LoadConfiguration(util.GetViper()) + myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) r := mux.NewRouter() @@ -112,7 +115,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("Master startup error: %v", e) } // start raftServer - raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds) if raftServer == nil { glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder) @@ -126,7 +129,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) + grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) diff --git a/weed/command/mount.go b/weed/command/mount.go index 71c1a4387..f09b285f7 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -10,7 +10,7 @@ type MountOptions struct { filer *string filerMountRootPath *string dir *string - dirListingLimit *int + dirListCacheLimit *int64 collection *string replication *string ttlSec *int @@ -31,7 +31,7 @@ func init() { mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server") mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") - mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size") + mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") @@ -64,12 +64,12 @@ var cmdMount = &Command{ func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { hostnameAndPort := strings.Split(filer, ":") if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort) + return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) } filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) if parseErr != nil { - return "", fmt.Errorf("The filer filer port parse error: %v", parseErr) + return "", fmt.Errorf("filer port parse error: %v", parseErr) } filerGrpcPort := int(filerPort) + 10000 diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go index 632691e47..f0a5581e7 100644 --- a/weed/command/mount_darwin.go +++ b/weed/command/mount_darwin.go @@ -7,3 +7,7 @@ import ( func osSpecificMountOptions() []fuse.MountOption { return []fuse.MountOption{} } + +func checkMountPointAvailable(dir string) bool { + return true +} diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go index 632691e47..f0a5581e7 100644 --- a/weed/command/mount_freebsd.go +++ b/weed/command/mount_freebsd.go @@ -7,3 +7,7 @@ import ( func osSpecificMountOptions() []fuse.MountOption { return []fuse.MountOption{} } + +func checkMountPointAvailable(dir string) bool { + return true +} diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go index 7d94e5142..80a5f9da4 100644 --- a/weed/command/mount_linux.go +++ b/weed/command/mount_linux.go @@ -1,11 +1,157 @@ package command import ( + "bufio" + "fmt" + "io" + "os" + "strings" + "github.com/seaweedfs/fuse" ) +const ( + /* 36 35 98:0 /mnt1 /mnt2 rw,noatime master:1 - ext3 /dev/root rw,errors=continue + (1)(2)(3) (4) (5) (6) (7) (8) (9) (10) (11) + + (1) mount ID: unique identifier of the mount (may be reused after umount) + (2) parent ID: ID of parent (or of self for the top of the mount tree) + (3) major:minor: value of st_dev for files on filesystem + (4) root: root of the mount within the filesystem + (5) mount point: mount point relative to the process's root + (6) mount options: per mount options + (7) optional fields: zero or more fields of the form "tag[:value]" + (8) separator: marks the end of the optional fields + (9) filesystem type: name of filesystem of the form "type[.subtype]" + (10) mount source: filesystem specific information or "none" + (11) super options: per super block options*/ + mountinfoFormat = "%d %d %d:%d %s %s %s %s" +) + +// Info reveals information about a particular mounted filesystem. This +// struct is populated from the content in the /proc/<pid>/mountinfo file. +type Info struct { + // ID is a unique identifier of the mount (may be reused after umount). + ID int + + // Parent indicates the ID of the mount parent (or of self for the top of the + // mount tree). + Parent int + + // Major indicates one half of the device ID which identifies the device class. + Major int + + // Minor indicates one half of the device ID which identifies a specific + // instance of device. + Minor int + + // Root of the mount within the filesystem. + Root string + + // Mountpoint indicates the mount point relative to the process's root. + Mountpoint string + + // Opts represents mount-specific options. + Opts string + + // Optional represents optional fields. + Optional string + + // Fstype indicates the type of filesystem, such as EXT3. + Fstype string + + // Source indicates filesystem specific information or "none". + Source string + + // VfsOpts represents per super block options. + VfsOpts string +} + +// Mounted determines if a specified mountpoint has been mounted. +// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab. +func mounted(mountPoint string) (bool, error) { + entries, err := parseMountTable() + if err != nil { + return false, err + } + + // Search the table for the mountPoint + for _, e := range entries { + if e.Mountpoint == mountPoint { + return true, nil + } + } + return false, nil +} + +// Parse /proc/self/mountinfo because comparing Dev and ino does not work from +// bind mounts +func parseMountTable() ([]*Info, error) { + f, err := os.Open("/proc/self/mountinfo") + if err != nil { + return nil, err + } + defer f.Close() + + return parseInfoFile(f) +} + +func parseInfoFile(r io.Reader) ([]*Info, error) { + var ( + s = bufio.NewScanner(r) + out []*Info + ) + + for s.Scan() { + if err := s.Err(); err != nil { + return nil, err + } + + var ( + p = &Info{} + text = s.Text() + optionalFields string + ) + + if _, err := fmt.Sscanf(text, mountinfoFormat, + &p.ID, &p.Parent, &p.Major, &p.Minor, + &p.Root, &p.Mountpoint, &p.Opts, &optionalFields); err != nil { + return nil, fmt.Errorf("Scanning '%s' failed: %s", text, err) + } + // Safe as mountinfo encodes mountpoints with spaces as \040. + index := strings.Index(text, " - ") + postSeparatorFields := strings.Fields(text[index+3:]) + if len(postSeparatorFields) < 3 { + return nil, fmt.Errorf("Error found less than 3 fields post '-' in %q", text) + } + + if optionalFields != "-" { + p.Optional = optionalFields + } + + p.Fstype = postSeparatorFields[0] + p.Source = postSeparatorFields[1] + p.VfsOpts = strings.Join(postSeparatorFields[2:], " ") + out = append(out, p) + } + return out, nil +} + func osSpecificMountOptions() []fuse.MountOption { return []fuse.MountOption{ fuse.AllowNonEmptyMount(), } } + +func checkMountPointAvailable(dir string) bool { + mountPoint := dir + if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") { + mountPoint = mountPoint[0 : len(mountPoint)-1] + } + + if mounted, err := mounted(mountPoint); err != nil || mounted { + return false + } + + return true +} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 6ca9bfdca..891810e61 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -12,12 +12,11 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/jacobsa/daemonize" - "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" @@ -43,13 +42,13 @@ func runMount(cmd *Command, args []string) bool { *mountOptions.chunkSizeLimitMB, *mountOptions.allowOthers, *mountOptions.ttlSec, - *mountOptions.dirListingLimit, + *mountOptions.dirListCacheLimit, os.FileMode(umask), ) } func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, - allowOthers bool, ttlSec int, dirListingLimit int, umask os.FileMode) bool { + allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool { util.LoadConfiguration("security", false) @@ -88,12 +87,18 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente } } + // Ensure target mount point availability + if isValid := checkMountPointAvailable(dir); !isValid { + glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir) + return false + } + mountName := path.Base(dir) options := []fuse.MountOption{ fuse.VolumeName(mountName), - fuse.FSName("SeaweedFS"), - fuse.Subtype("SeaweedFS"), + fuse.FSName(filer + ":" + filerMountRootPath), + fuse.Subtype("seaweedfs"), fuse.NoAppleDouble(), fuse.NoAppleXattr(), fuse.NoBrowse(), @@ -116,9 +121,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente c, err := fuse.Mount(dir, options...) if err != nil { - glog.Fatal(err) + glog.V(0).Infof("mount: %v", err) daemonize.SignalOutcome(err) - return false + return true } util.OnInterrupt(func() { @@ -128,9 +133,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente filerGrpcAddress, err := parseFilerGrpcAddress(filer) if err != nil { - glog.Fatal(err) + glog.V(0).Infof("parseFilerGrpcAddress: %v", err) daemonize.SignalOutcome(err) - return false + return true } mountRoot := filerMountRootPath @@ -142,14 +147,14 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{ FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), FilerMountRootPath: mountRoot, Collection: collection, Replication: replication, TtlSec: int32(ttlSec), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, DataCenter: dataCenter, - DirListingLimit: dirListingLimit, + DirListCacheLimit: dirListCacheLimit, EntryCacheTtl: 3 * time.Second, MountUid: uid, MountGid: gid, @@ -165,8 +170,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente // check if the mount process has an error to report <-c.Ready if err := c.MountError; err != nil { - glog.Fatal(err) + glog.V(0).Infof("mount process: %v", err) daemonize.SignalOutcome(err) + return true } return true diff --git a/weed/command/s3.go b/weed/command/s3.go index e004bb066..10a486657 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -1,18 +1,17 @@ package command import ( + "fmt" "net/http" "time" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "fmt" + "github.com/gorilla/mux" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/s3api" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" ) var ( @@ -69,7 +68,7 @@ func (s3opt *S3Options) startS3Server() bool { FilerGrpcAddress: filerGrpcAddress, DomainName: *s3opt.domainName, BucketsPath: *s3opt.filerBucketsPath, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 7a988cdcf..ab658735f 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -14,6 +14,14 @@ var cmdScaffold = &Command{ Short: "generate basic configuration files", Long: `Generate filer.toml with all possible configurations for you to customize. + The options can also be overwritten by environment variables. + For example, the filer.toml mysql password can be overwritten by environment variable + export WEED_MYSQL_PASSWORD=some_password + Environment variable rules: + * Prefix fix with "WEED_" + * Upppercase the reset of variable name. + * Replace '.' with '_' + `, } @@ -59,14 +67,18 @@ const ( # $HOME/.seaweedfs/filer.toml # /etc/seaweedfs/filer.toml -[memory] -# local in memory, mostly for testing purpose -enabled = false +#################################################### +# Customizable filer server options +#################################################### +[filer.options] +# with http DELETE, by default the filer would check whether a folder is empty. +# recursive_delete will delete all sub folders and files, similar to "rm -Rf" +recursive_delete = false -[leveldb] -# local on disk, mostly for simple single-machine setup, fairly scalable -enabled = false -dir = "." # directory to store level db files + +#################################################### +# The following are filer store options +#################################################### [leveldb2] # local on disk, mostly for simple single-machine setup, fairly scalable @@ -74,10 +86,6 @@ dir = "." # directory to store level db files enabled = true dir = "." # directory to store level db files -#################################################### -# multiple filers on shared storage, fairly scalable -#################################################### - [mysql] # or tidb # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', @@ -95,6 +103,7 @@ password = "" database = "" # create or use an existing database connection_max_idle = 2 connection_max_open = 100 +interpolateParams = false [postgres] # or cockroachdb # CREATE TABLE IF NOT EXISTS filemeta ( @@ -144,6 +153,10 @@ addresses = [ "localhost:30006", ] password = "" +# allows reads from slave servers or the master, but all writes still go to the master +readOnly = true +# automatically use the closest Redis server for reads +routeByLatency = true [etcd] enabled = false @@ -346,5 +359,32 @@ scripts = """ """ sleep_minutes = 17 # sleep minutes between each script execution +[master.filer] +default_filer_url = "http://localhost:8888/" + +[master.sequencer] +type = "memory" # Choose [memory|etcd] type for storing the file id sequence +# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence +# example : http://127.0.0.1:2379,http://127.0.0.1:2389 +sequencer_etcd_urls = "http://127.0.0.1:2379" + + +# configurations for tiered cloud storage +# old volumes are transparently moved to cloud for cost efficiency +[storage.backend] + [storage.backend.s3.default] + enabled = false + aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). + aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). + region = "us-east-2" + bucket = "your_bucket_name" # an existing bucket + +# create this number of logical volumes if no more writable volumes +[master.volume_growth] +count_1 = 7 # create 1 x 7 = 7 actual volumes +count_2 = 6 # create 2 x 6 = 12 actual volumes +count_3 = 3 # create 3 x 3 = 9 actual volumes +count_other = 1 # create n x 1 = n actual volumes + ` ) diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go new file mode 100644 index 000000000..423dacc32 --- /dev/null +++ b/weed/command/scaffold_test.go @@ -0,0 +1,44 @@ +package command + +import ( + "bytes" + "fmt" + "testing" + + "github.com/spf13/viper" +) + +func TestReadingTomlConfiguration(t *testing.T) { + + viper.SetConfigType("toml") + + // any approach to require this configuration into your program. + var tomlExample = []byte(` +[database] +server = "192.168.1.1" +ports = [ 8001, 8001, 8002 ] +connection_max = 5000 +enabled = true + +[servers] + + # You can indent as you please. Tabs or spaces. TOML don't care. + [servers.alpha] + ip = "10.0.0.1" + dc = "eqdc10" + + [servers.beta] + ip = "10.0.0.2" + dc = "eqdc10" + +`) + + viper.ReadConfig(bytes.NewBuffer(tomlExample)) + + fmt.Printf("database is %v\n", viper.Get("database")) + fmt.Printf("servers is %v\n", viper.GetStringMap("servers")) + + alpha := viper.Sub("servers.alpha") + + fmt.Printf("alpha ip is %v\n", alpha.GetString("ip")) +} diff --git a/weed/command/server.go b/weed/command/server.go index 87f404ed3..6aa68b6d2 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -89,6 +89,7 @@ func init() { serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") + serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") diff --git a/weed/command/shell.go b/weed/command/shell.go index 91aa8770a..dcf70608f 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -2,14 +2,10 @@ package command import ( "fmt" - "net/url" - "strconv" - "strings" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/shell" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" ) var ( @@ -34,10 +30,10 @@ var cmdShell = &Command{ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) - shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") var filerPwdErr error - shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = parseFilerUrl(*shellInitialFilerUrl) + shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl) if filerPwdErr != nil { fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr) return false @@ -48,22 +44,3 @@ func runShell(command *Command, args []string) bool { return true } - -func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { - if !strings.HasPrefix(entryPath, "http://") && !strings.HasPrefix(entryPath, "https://") { - entryPath = "http://" + entryPath - } - - var u *url.URL - u, err = url.Parse(entryPath) - if err != nil { - return - } - filerServer = u.Hostname() - portString := u.Port() - if portString != "" { - filerPort, err = strconv.ParseInt(portString, 10, 32) - } - path = u.Path - return -} diff --git a/weed/command/upload.go b/weed/command/upload.go index 25e938d9b..d71046131 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -6,11 +6,9 @@ import ( "os" "path/filepath" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" - - "github.com/chrislusf/seaweedfs/weed/operation" ) var ( @@ -63,7 +61,7 @@ var cmdUpload = &Command{ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") if len(args) == 0 { if *upload.dir == "" { diff --git a/weed/command/volume.go b/weed/command/volume.go index 3c1aa2b50..9d665d143 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -1,6 +1,7 @@ package command import ( + "fmt" "net/http" "os" "runtime" @@ -9,15 +10,19 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/spf13/viper" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util/httpdown" + + "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc/reflection" ) var ( @@ -44,6 +49,7 @@ type VolumeServerOptions struct { cpuProfile *string memProfile *string compactionMBPerSecond *int + fileSizeLimitMB *int } func init() { @@ -64,6 +70,7 @@ func init() { v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") + v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") } var cmdVolume = &Command{ @@ -94,7 +101,7 @@ func runVolume(cmd *Command, args []string) bool { func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) { - //Set multiple folders and each folder's max volume count limit' + // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") maxCountStrings := strings.Split(maxVolumeCounts, ",") for _, maxString := range maxCountStrings { @@ -113,7 +120,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } } - //security related white list configuration + // security related white list configuration if volumeWhiteListOption != "" { v.whiteList = strings.Split(volumeWhiteListOption, ",") } @@ -128,11 +135,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if *v.publicUrl == "" { *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) } - isSeperatedPublicPort := *v.publicPort != *v.port volumeMux := http.NewServeMux() publicVolumeMux := volumeMux - if isSeperatedPublicPort { + if v.isSeparatedPublicPort() { publicVolumeMux = http.NewServeMux() } @@ -156,53 +162,134 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v v.whiteList, *v.fixJpgOrientation, *v.readRedirect, *v.compactionMBPerSecond, + *v.fileSizeLimitMB, ) - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) - glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) - listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) - } - if isSeperatedPublicPort { - publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) - glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) - publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) + // starting grpc server + grpcS := v.startGrpcService(volumeServer) + + // starting public http server + var publicHttpDown httpdown.Server + if v.isSeparatedPublicPort() { + publicHttpDown = v.startPublicHttpService(publicVolumeMux) + if nil == publicHttpDown { + glog.Fatalf("start public http service failed") } - go func() { - if e := http.Serve(publicListener, publicVolumeMux); e != nil { - glog.Fatalf("Volume server fail to serve public: %v", e) - } - }() } + // starting the cluster http server + clusterHttpServer := v.startClusterHttpService(volumeMux) + + stopChain := make(chan struct{}) util.OnInterrupt(func() { + fmt.Println("volume server has be killed") + var startTime time.Time + + // firstly, stop the public http service to prevent from receiving new user request + if nil != publicHttpDown { + startTime = time.Now() + if err := publicHttpDown.Stop(); err != nil { + glog.Warningf("stop the public http server failed, %v", err) + } + delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("stop public http server, elapsed %dms", delta) + } + + startTime = time.Now() + if err := clusterHttpServer.Stop(); err != nil { + glog.Warningf("stop the cluster http server failed, %v", err) + } + delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta) + + startTime = time.Now() + grpcS.GracefulStop() + delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta) + + startTime = time.Now() volumeServer.Shutdown() + delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("stop volume server, elapsed [%d]", delta) + pprof.StopCPUProfile() + + close(stopChain) // notify exit }) - // starting grpc server + select { + case <-stopChain: + } + glog.Warningf("the volume server exit.") +} + +// check whether configure the public port +func (v VolumeServerOptions) isSeparatedPublicPort() bool { + return *v.publicPort != *v.port +} + +func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server { grpcPort := *v.port + 10000 grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume")) - volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer) + grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) + volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) - go grpcS.Serve(grpcL) - - if viper.GetString("https.volume.key") != "" { - if e := http.ServeTLS(listener, volumeMux, - viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) + go func() { + if err := grpcS.Serve(grpcL); err != nil { + glog.Fatalf("start gRPC service failed, %s", err) } - } else { - if e := http.Serve(listener, volumeMux); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) + }() + return grpcS +} + +func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { + publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) + publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + + pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute} + publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener) + go func() { + if err := publicHttpDown.Wait(); err != nil { + glog.Errorf("public http down wait failed, %v", err) } + }() + + return publicHttpDown +} + +func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server { + var ( + certFile, keyFile string + ) + if viper.GetString("https.volume.key") != "" { + certFile = viper.GetString("https.volume.cert") + keyFile = viper.GetString("https.volume.key") } + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) + glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) + listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + + httpDown := httpdown.HTTP{ + KillTimeout: 5 * time.Minute, + StopTimeout: 5 * time.Minute, + CertFile: certFile, + KeyFile: keyFile} + clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener) + go func() { + if e := clusterHttpServer.Wait(); e != nil { + glog.Fatalf("Volume server fail to serve: %v", e) + } + }() + return clusterHttpServer } diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 371c4a9ad..0e6f89040 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -11,7 +11,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" ) var ( @@ -75,7 +74,7 @@ func (wo *WebDavOption) startWebDav() bool { ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ Filer: *wo.filer, FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), Collection: *wo.collection, Uid: uid, Gid: gid, |
