aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorbinbinshi <javabinbin@126.com>2020-02-05 16:56:23 +0800
committerGitHub <noreply@github.com>2020-02-05 16:56:23 +0800
commitd892cad15d748327c2b7c649f6398ff35d8dce0b (patch)
tree29cb8adae01d9f4eaeabb02996d162700da2de1a /weed/command
parentd4f755347e4874cf0a2fd13480580f348b86a465 (diff)
parent8d94564f4152cd890d5896a3dedf5e7589c5023e (diff)
downloadseaweedfs-d892cad15d748327c2b7c649f6398ff35d8dce0b.tar.xz
seaweedfs-d892cad15d748327c2b7c649f6398ff35d8dce0b.zip
Merge pull request #1 from chrislusf/master
update from chrisluf
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go14
-rw-r--r--weed/command/benchmark.go3
-rw-r--r--weed/command/compact.go5
-rw-r--r--weed/command/export.go22
-rw-r--r--weed/command/filer.go7
-rw-r--r--weed/command/filer_copy.go14
-rw-r--r--weed/command/filer_replication.go17
-rw-r--r--weed/command/fix.go30
-rw-r--r--weed/command/master.go13
-rw-r--r--weed/command/mount.go8
-rw-r--r--weed/command/mount_darwin.go4
-rw-r--r--weed/command/mount_freebsd.go4
-rw-r--r--weed/command/mount_linux.go146
-rw-r--r--weed/command/mount_std.go32
-rw-r--r--weed/command/s3.go7
-rw-r--r--weed/command/scaffold.go62
-rw-r--r--weed/command/scaffold_test.go44
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/shell.go27
-rw-r--r--weed/command/upload.go6
-rw-r--r--weed/command/volume.go157
-rw-r--r--weed/command/webdav.go3
22 files changed, 464 insertions, 162 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 505de4ae6..eb2b5ba4a 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -5,8 +5,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -64,7 +64,7 @@ var cmdBackup = &Command{
func runBackup(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if *s.volumeId == -1 {
return false
@@ -98,15 +98,15 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
}
- var replication *storage.ReplicaPlacement
+ var replication *super_block.ReplicaPlacement
if *s.replication != "" {
- replication, err = storage.NewReplicaPlacementFromString(*s.replication)
+ replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
if err != nil {
fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
return true
}
} else {
- replication, err = storage.NewReplicaPlacementFromString(stats.Replication)
+ replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
if err != nil {
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
return true
@@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool {
}
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
- if err = v.Compact(0, 0); err != nil {
+ if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true
}
@@ -128,7 +128,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
- v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0)
+ v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
}
datSize, _, _ := v.FileStat()
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 26be1fe3a..382e7c850 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -15,7 +15,6 @@ import (
"sync"
"time"
- "github.com/spf13/viper"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -109,7 +108,7 @@ var (
func runBenchmark(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 4a54f5670..85313b749 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -17,6 +17,9 @@ var cmdCompact = &Command{
The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file.
+ For method=0, it compacts based on the .dat file, works if .idx file is corrupted.
+ For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files.
+
`,
}
@@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
- if err = v.Compact2(); err != nil {
+ if err = v.Compact2(preallocate); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
}
diff --git a/weed/command/export.go b/weed/command/export.go
index d3a765e09..8d664ad3b 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"fmt"
+ "io"
"os"
"path"
"path/filepath"
@@ -12,11 +13,11 @@ import (
"text/template"
"time"
- "io"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -89,12 +90,12 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version,
type VolumeFileScanner4Export struct {
version needle.Version
counter int
- needleMap *storage.NeedleMap
+ needleMap *needle_map.MemDb
vid needle.VolumeId
}
-func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool {
fileName = *export.collection + "_" + fileName
}
vid := needle.VolumeId(*export.volumeId)
- indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
- if err != nil {
- glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
+ needleMap := needle_map.NewMemDb()
+ if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
+ glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
volumeFileScanner := &VolumeFileScanner4Export{
diff --git a/weed/command/filer.go b/weed/command/filer.go
index b1ceb46f5..ea8392fac 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -6,14 +6,13 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
+ "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc/reflection"
)
var (
@@ -145,7 +144,7 @@ func (fo *FilerOptions) startFiler() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "filer"))
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 9995cf6aa..e5979d786 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -14,13 +14,13 @@ import (
"sync"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
)
var (
@@ -105,7 +105,7 @@ func runCopy(cmd *Command, args []string) bool {
filerGrpcPort := filerPort + 10000
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
- copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
ctx := context.Background()
@@ -331,7 +331,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
},
}
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -378,7 +378,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
uploadResult, err := operation.Upload(targetUrl,
fileName+"-"+strconv.FormatInt(i+1, 10),
io.NewSectionReader(f, i*chunkSize, chunkSize),
- false, "application/octet-stream", nil, assignResult.Auth)
+ false, "", nil, assignResult.Auth)
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -435,7 +435,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
},
}
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -466,7 +466,7 @@ func detectMimeType(f *os.File) string {
func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(clientConn)
return fn(client)
}, filerAddress, grpcDialOption)
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index c6e7f5dba..737f0d24a 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -39,7 +39,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
util.LoadConfiguration("notification", true)
- config := viper.GetViper()
+ config := util.GetViper()
var notificationInput sub.NotificationInput
@@ -47,8 +47,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
- viperSub := config.Sub("notification." + input.GetName())
- if err := input.Initialize(viperSub); err != nil {
+ if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize notification input for %s: %+v",
input.GetName(), err)
}
@@ -66,10 +65,9 @@ func runFilerReplicate(cmd *Command, args []string) bool {
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
- sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
- if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
- fromDir := sourceConfig.GetString("directory")
- toDir := sinkConfig.GetString("directory")
+ if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
+ fromDir := config.GetString("source.filer.directory")
+ toDir := config.GetString("sink.filer.directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
@@ -79,8 +77,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
- viperSub := config.Sub("sink." + sk.GetName())
- if err := sk.Initialize(viperSub); err != nil {
+ if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
@@ -98,7 +95,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
return true
}
- replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink)
+ replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for {
key, m, err := notificationInput.ReceiveMessage()
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 2fbbca5e6..8903595fa 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -8,6 +8,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -31,11 +33,11 @@ var (
type VolumeFileScanner4Fix struct {
version needle.Version
- nm *storage.NeedleMap
+ nm *needle_map.MemDb
}
-func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -46,11 +48,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
- pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)
+ pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
- return scanner.nm.Delete(n.Id, types.ToOffset(offset))
+ return scanner.nm.Delete(n.Id)
}
return nil
}
@@ -66,23 +68,21 @@ func runFix(cmd *Command, args []string) bool {
baseFileName = *fixVolumeCollection + "_" + baseFileName
}
indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
- indexFile, err := os.OpenFile(indexFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- nm := storage.NewBtreeNeedleMap(indexFile)
- defer nm.Close()
+ nm := needle_map.NewMemDb()
vid := needle.VolumeId(*fixVolumeId)
scanner := &VolumeFileScanner4Fix{
nm: nm,
}
- err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner)
- if err != nil {
- glog.Fatalf("Export Volume File [ERROR] %s\n", err)
+ if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
+ glog.Fatalf("scan .dat File: %v", err)
+ os.Remove(indexFileName)
+ }
+
+ if err := nm.SaveToIdx(indexFileName); err != nil {
+ glog.Fatalf("save to .idx File: %v", err)
os.Remove(indexFileName)
}
diff --git a/weed/command/master.go b/weed/command/master.go
index 3d33f4f7a..c4b11119b 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -8,14 +8,15 @@ import (
"strings"
"github.com/chrislusf/raft/protobuf"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc/reflection"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
- "github.com/spf13/viper"
- "google.golang.org/grpc/reflection"
)
var (
@@ -101,6 +102,8 @@ func runMaster(cmd *Command, args []string) bool {
func startMaster(masterOption MasterOptions, masterWhiteList []string) {
+ backend.LoadConfiguration(util.GetViper())
+
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
r := mux.NewRouter()
@@ -112,7 +115,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("Master startup error: %v", e)
}
// start raftServer
- raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
+ raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds)
if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
@@ -126,7 +129,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 71c1a4387..f09b285f7 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -10,7 +10,7 @@ type MountOptions struct {
filer *string
filerMountRootPath *string
dir *string
- dirListingLimit *int
+ dirListCacheLimit *int64
collection *string
replication *string
ttlSec *int
@@ -31,7 +31,7 @@ func init() {
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
- mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size")
+ mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
@@ -64,12 +64,12 @@ var cmdMount = &Command{
func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
hostnameAndPort := strings.Split(filer, ":")
if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort)
+ return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
}
filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
if parseErr != nil {
- return "", fmt.Errorf("The filer filer port parse error: %v", parseErr)
+ return "", fmt.Errorf("filer port parse error: %v", parseErr)
}
filerGrpcPort := int(filerPort) + 10000
diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go
index 632691e47..f0a5581e7 100644
--- a/weed/command/mount_darwin.go
+++ b/weed/command/mount_darwin.go
@@ -7,3 +7,7 @@ import (
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{}
}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go
index 632691e47..f0a5581e7 100644
--- a/weed/command/mount_freebsd.go
+++ b/weed/command/mount_freebsd.go
@@ -7,3 +7,7 @@ import (
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{}
}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
index 7d94e5142..80a5f9da4 100644
--- a/weed/command/mount_linux.go
+++ b/weed/command/mount_linux.go
@@ -1,11 +1,157 @@
package command
import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+
"github.com/seaweedfs/fuse"
)
+const (
+ /* 36 35 98:0 /mnt1 /mnt2 rw,noatime master:1 - ext3 /dev/root rw,errors=continue
+ (1)(2)(3) (4) (5) (6) (7) (8) (9) (10) (11)
+
+ (1) mount ID: unique identifier of the mount (may be reused after umount)
+ (2) parent ID: ID of parent (or of self for the top of the mount tree)
+ (3) major:minor: value of st_dev for files on filesystem
+ (4) root: root of the mount within the filesystem
+ (5) mount point: mount point relative to the process's root
+ (6) mount options: per mount options
+ (7) optional fields: zero or more fields of the form "tag[:value]"
+ (8) separator: marks the end of the optional fields
+ (9) filesystem type: name of filesystem of the form "type[.subtype]"
+ (10) mount source: filesystem specific information or "none"
+ (11) super options: per super block options*/
+ mountinfoFormat = "%d %d %d:%d %s %s %s %s"
+)
+
+// Info reveals information about a particular mounted filesystem. This
+// struct is populated from the content in the /proc/<pid>/mountinfo file.
+type Info struct {
+ // ID is a unique identifier of the mount (may be reused after umount).
+ ID int
+
+ // Parent indicates the ID of the mount parent (or of self for the top of the
+ // mount tree).
+ Parent int
+
+ // Major indicates one half of the device ID which identifies the device class.
+ Major int
+
+ // Minor indicates one half of the device ID which identifies a specific
+ // instance of device.
+ Minor int
+
+ // Root of the mount within the filesystem.
+ Root string
+
+ // Mountpoint indicates the mount point relative to the process's root.
+ Mountpoint string
+
+ // Opts represents mount-specific options.
+ Opts string
+
+ // Optional represents optional fields.
+ Optional string
+
+ // Fstype indicates the type of filesystem, such as EXT3.
+ Fstype string
+
+ // Source indicates filesystem specific information or "none".
+ Source string
+
+ // VfsOpts represents per super block options.
+ VfsOpts string
+}
+
+// Mounted determines if a specified mountpoint has been mounted.
+// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab.
+func mounted(mountPoint string) (bool, error) {
+ entries, err := parseMountTable()
+ if err != nil {
+ return false, err
+ }
+
+ // Search the table for the mountPoint
+ for _, e := range entries {
+ if e.Mountpoint == mountPoint {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+// Parse /proc/self/mountinfo because comparing Dev and ino does not work from
+// bind mounts
+func parseMountTable() ([]*Info, error) {
+ f, err := os.Open("/proc/self/mountinfo")
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ return parseInfoFile(f)
+}
+
+func parseInfoFile(r io.Reader) ([]*Info, error) {
+ var (
+ s = bufio.NewScanner(r)
+ out []*Info
+ )
+
+ for s.Scan() {
+ if err := s.Err(); err != nil {
+ return nil, err
+ }
+
+ var (
+ p = &Info{}
+ text = s.Text()
+ optionalFields string
+ )
+
+ if _, err := fmt.Sscanf(text, mountinfoFormat,
+ &p.ID, &p.Parent, &p.Major, &p.Minor,
+ &p.Root, &p.Mountpoint, &p.Opts, &optionalFields); err != nil {
+ return nil, fmt.Errorf("Scanning '%s' failed: %s", text, err)
+ }
+ // Safe as mountinfo encodes mountpoints with spaces as \040.
+ index := strings.Index(text, " - ")
+ postSeparatorFields := strings.Fields(text[index+3:])
+ if len(postSeparatorFields) < 3 {
+ return nil, fmt.Errorf("Error found less than 3 fields post '-' in %q", text)
+ }
+
+ if optionalFields != "-" {
+ p.Optional = optionalFields
+ }
+
+ p.Fstype = postSeparatorFields[0]
+ p.Source = postSeparatorFields[1]
+ p.VfsOpts = strings.Join(postSeparatorFields[2:], " ")
+ out = append(out, p)
+ }
+ return out, nil
+}
+
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{
fuse.AllowNonEmptyMount(),
}
}
+
+func checkMountPointAvailable(dir string) bool {
+ mountPoint := dir
+ if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {
+ mountPoint = mountPoint[0 : len(mountPoint)-1]
+ }
+
+ if mounted, err := mounted(mountPoint); err != nil || mounted {
+ return false
+ }
+
+ return true
+}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 6ca9bfdca..891810e61 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -12,12 +12,11 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/jacobsa/daemonize"
- "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -43,13 +42,13 @@ func runMount(cmd *Command, args []string) bool {
*mountOptions.chunkSizeLimitMB,
*mountOptions.allowOthers,
*mountOptions.ttlSec,
- *mountOptions.dirListingLimit,
+ *mountOptions.dirListCacheLimit,
os.FileMode(umask),
)
}
func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
- allowOthers bool, ttlSec int, dirListingLimit int, umask os.FileMode) bool {
+ allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool {
util.LoadConfiguration("security", false)
@@ -88,12 +87,18 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
}
}
+ // Ensure target mount point availability
+ if isValid := checkMountPointAvailable(dir); !isValid {
+ glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
+ return false
+ }
+
mountName := path.Base(dir)
options := []fuse.MountOption{
fuse.VolumeName(mountName),
- fuse.FSName("SeaweedFS"),
- fuse.Subtype("SeaweedFS"),
+ fuse.FSName(filer + ":" + filerMountRootPath),
+ fuse.Subtype("seaweedfs"),
fuse.NoAppleDouble(),
fuse.NoAppleXattr(),
fuse.NoBrowse(),
@@ -116,9 +121,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
c, err := fuse.Mount(dir, options...)
if err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount: %v", err)
daemonize.SignalOutcome(err)
- return false
+ return true
}
util.OnInterrupt(func() {
@@ -128,9 +133,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
filerGrpcAddress, err := parseFilerGrpcAddress(filer)
if err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("parseFilerGrpcAddress: %v", err)
daemonize.SignalOutcome(err)
- return false
+ return true
}
mountRoot := filerMountRootPath
@@ -142,14 +147,14 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{
FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
+ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
FilerMountRootPath: mountRoot,
Collection: collection,
Replication: replication,
TtlSec: int32(ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
DataCenter: dataCenter,
- DirListingLimit: dirListingLimit,
+ DirListCacheLimit: dirListCacheLimit,
EntryCacheTtl: 3 * time.Second,
MountUid: uid,
MountGid: gid,
@@ -165,8 +170,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount process: %v", err)
daemonize.SignalOutcome(err)
+ return true
}
return true
diff --git a/weed/command/s3.go b/weed/command/s3.go
index e004bb066..10a486657 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -1,18 +1,17 @@
package command
import (
+ "fmt"
"net/http"
"time"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
- "fmt"
+ "github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
)
var (
@@ -69,7 +68,7 @@ func (s3opt *S3Options) startS3Server() bool {
FilerGrpcAddress: filerGrpcAddress,
DomainName: *s3opt.domainName,
BucketsPath: *s3opt.filerBucketsPath,
- GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
+ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 7a988cdcf..ab658735f 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -14,6 +14,14 @@ var cmdScaffold = &Command{
Short: "generate basic configuration files",
Long: `Generate filer.toml with all possible configurations for you to customize.
+ The options can also be overwritten by environment variables.
+ For example, the filer.toml mysql password can be overwritten by environment variable
+ export WEED_MYSQL_PASSWORD=some_password
+ Environment variable rules:
+ * Prefix fix with "WEED_"
+ * Upppercase the reset of variable name.
+ * Replace '.' with '_'
+
`,
}
@@ -59,14 +67,18 @@ const (
# $HOME/.seaweedfs/filer.toml
# /etc/seaweedfs/filer.toml
-[memory]
-# local in memory, mostly for testing purpose
-enabled = false
+####################################################
+# Customizable filer server options
+####################################################
+[filer.options]
+# with http DELETE, by default the filer would check whether a folder is empty.
+# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
+recursive_delete = false
-[leveldb]
-# local on disk, mostly for simple single-machine setup, fairly scalable
-enabled = false
-dir = "." # directory to store level db files
+
+####################################################
+# The following are filer store options
+####################################################
[leveldb2]
# local on disk, mostly for simple single-machine setup, fairly scalable
@@ -74,10 +86,6 @@ dir = "." # directory to store level db files
enabled = true
dir = "." # directory to store level db files
-####################################################
-# multiple filers on shared storage, fairly scalable
-####################################################
-
[mysql] # or tidb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
@@ -95,6 +103,7 @@ password = ""
database = "" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
+interpolateParams = false
[postgres] # or cockroachdb
# CREATE TABLE IF NOT EXISTS filemeta (
@@ -144,6 +153,10 @@ addresses = [
"localhost:30006",
]
password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = true
+# automatically use the closest Redis server for reads
+routeByLatency = true
[etcd]
enabled = false
@@ -346,5 +359,32 @@ scripts = """
"""
sleep_minutes = 17 # sleep minutes between each script execution
+[master.filer]
+default_filer_url = "http://localhost:8888/"
+
+[master.sequencer]
+type = "memory" # Choose [memory|etcd] type for storing the file id sequence
+# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
+# example : http://127.0.0.1:2379,http://127.0.0.1:2389
+sequencer_etcd_urls = "http://127.0.0.1:2379"
+
+
+# configurations for tiered cloud storage
+# old volumes are transparently moved to cloud for cost efficiency
+[storage.backend]
+ [storage.backend.s3.default]
+ enabled = false
+ aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ region = "us-east-2"
+ bucket = "your_bucket_name" # an existing bucket
+
+# create this number of logical volumes if no more writable volumes
+[master.volume_growth]
+count_1 = 7 # create 1 x 7 = 7 actual volumes
+count_2 = 6 # create 2 x 6 = 12 actual volumes
+count_3 = 3 # create 3 x 3 = 9 actual volumes
+count_other = 1 # create n x 1 = n actual volumes
+
`
)
diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go
new file mode 100644
index 000000000..423dacc32
--- /dev/null
+++ b/weed/command/scaffold_test.go
@@ -0,0 +1,44 @@
+package command
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/spf13/viper"
+)
+
+func TestReadingTomlConfiguration(t *testing.T) {
+
+ viper.SetConfigType("toml")
+
+ // any approach to require this configuration into your program.
+ var tomlExample = []byte(`
+[database]
+server = "192.168.1.1"
+ports = [ 8001, 8001, 8002 ]
+connection_max = 5000
+enabled = true
+
+[servers]
+
+ # You can indent as you please. Tabs or spaces. TOML don't care.
+ [servers.alpha]
+ ip = "10.0.0.1"
+ dc = "eqdc10"
+
+ [servers.beta]
+ ip = "10.0.0.2"
+ dc = "eqdc10"
+
+`)
+
+ viper.ReadConfig(bytes.NewBuffer(tomlExample))
+
+ fmt.Printf("database is %v\n", viper.Get("database"))
+ fmt.Printf("servers is %v\n", viper.GetStringMap("servers"))
+
+ alpha := viper.Sub("servers.alpha")
+
+ fmt.Printf("alpha ip is %v\n", alpha.GetString("ip"))
+}
diff --git a/weed/command/server.go b/weed/command/server.go
index 87f404ed3..6aa68b6d2 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -89,6 +89,7 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
+ serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 91aa8770a..dcf70608f 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -2,14 +2,10 @@ package command
import (
"fmt"
- "net/url"
- "strconv"
- "strings"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
var (
@@ -34,10 +30,10 @@ var cmdShell = &Command{
func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
- shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
var filerPwdErr error
- shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = parseFilerUrl(*shellInitialFilerUrl)
+ shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl)
if filerPwdErr != nil {
fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr)
return false
@@ -48,22 +44,3 @@ func runShell(command *Command, args []string) bool {
return true
}
-
-func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
- if !strings.HasPrefix(entryPath, "http://") && !strings.HasPrefix(entryPath, "https://") {
- entryPath = "http://" + entryPath
- }
-
- var u *url.URL
- u, err = url.Parse(entryPath)
- if err != nil {
- return
- }
- filerServer = u.Hostname()
- portString := u.Port()
- if portString != "" {
- filerPort, err = strconv.ParseInt(portString, 10, 32)
- }
- path = u.Path
- return
-}
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 25e938d9b..d71046131 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -6,11 +6,9 @@ import (
"os"
"path/filepath"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
-
- "github.com/chrislusf/seaweedfs/weed/operation"
)
var (
@@ -63,7 +61,7 @@ var cmdUpload = &Command{
func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if len(args) == 0 {
if *upload.dir == "" {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 3c1aa2b50..9d665d143 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"os"
"runtime"
@@ -9,15 +10,19 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util/httpdown"
+
+ "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc/reflection"
)
var (
@@ -44,6 +49,7 @@ type VolumeServerOptions struct {
cpuProfile *string
memProfile *string
compactionMBPerSecond *int
+ fileSizeLimitMB *int
}
func init() {
@@ -64,6 +70,7 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
+ v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
}
var cmdVolume = &Command{
@@ -94,7 +101,7 @@ func runVolume(cmd *Command, args []string) bool {
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
- //Set multiple folders and each folder's max volume count limit'
+ // Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
@@ -113,7 +120,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
- //security related white list configuration
+ // security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
}
@@ -128,11 +135,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if *v.publicUrl == "" {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
}
- isSeperatedPublicPort := *v.publicPort != *v.port
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
- if isSeperatedPublicPort {
+ if v.isSeparatedPublicPort() {
publicVolumeMux = http.NewServeMux()
}
@@ -156,53 +162,134 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
v.whiteList,
*v.fixJpgOrientation, *v.readRedirect,
*v.compactionMBPerSecond,
+ *v.fileSizeLimitMB,
)
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
- glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
- listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
- }
- if isSeperatedPublicPort {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
+ // starting grpc server
+ grpcS := v.startGrpcService(volumeServer)
+
+ // starting public http server
+ var publicHttpDown httpdown.Server
+ if v.isSeparatedPublicPort() {
+ publicHttpDown = v.startPublicHttpService(publicVolumeMux)
+ if nil == publicHttpDown {
+ glog.Fatalf("start public http service failed")
}
- go func() {
- if e := http.Serve(publicListener, publicVolumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve public: %v", e)
- }
- }()
}
+ // starting the cluster http server
+ clusterHttpServer := v.startClusterHttpService(volumeMux)
+
+ stopChain := make(chan struct{})
util.OnInterrupt(func() {
+ fmt.Println("volume server has be killed")
+ var startTime time.Time
+
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ startTime = time.Now()
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop public http server, elapsed %dms", delta)
+ }
+
+ startTime = time.Now()
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
+
+ startTime = time.Now()
+ grpcS.GracefulStop()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+
+ startTime = time.Now()
volumeServer.Shutdown()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+
pprof.StopCPUProfile()
+
+ close(stopChain) // notify exit
})
- // starting grpc server
+ select {
+ case <-stopChain:
+ }
+ glog.Warningf("the volume server exit.")
+}
+
+// check whether configure the public port
+func (v VolumeServerOptions) isSeparatedPublicPort() bool {
+ return *v.publicPort != *v.port
+}
+
+func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
grpcPort := *v.port + 10000
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume"))
- volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
+ volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
-
- if viper.GetString("https.volume.key") != "" {
- if e := http.ServeTLS(listener, volumeMux,
- viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ go func() {
+ if err := grpcS.Serve(grpcL); err != nil {
+ glog.Fatalf("start gRPC service failed, %s", err)
}
- } else {
- if e := http.Serve(listener, volumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ }()
+ return grpcS
+}
+
+func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
+ publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
+ publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute}
+ publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener)
+ go func() {
+ if err := publicHttpDown.Wait(); err != nil {
+ glog.Errorf("public http down wait failed, %v", err)
}
+ }()
+
+ return publicHttpDown
+}
+
+func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server {
+ var (
+ certFile, keyFile string
+ )
+ if viper.GetString("https.volume.key") != "" {
+ certFile = viper.GetString("https.volume.cert")
+ keyFile = viper.GetString("https.volume.key")
}
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
+ listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ httpDown := httpdown.HTTP{
+ KillTimeout: 5 * time.Minute,
+ StopTimeout: 5 * time.Minute,
+ CertFile: certFile,
+ KeyFile: keyFile}
+ clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ go func() {
+ if e := clusterHttpServer.Wait(); e != nil {
+ glog.Fatalf("Volume server fail to serve: %v", e)
+ }
+ }()
+ return clusterHttpServer
}
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 371c4a9ad..0e6f89040 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -11,7 +11,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
var (
@@ -75,7 +74,7 @@ func (wo *WebDavOption) startWebDav() bool {
ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{
Filer: *wo.filer,
FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
+ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
Collection: *wo.collection,
Uid: uid,
Gid: gid,