aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-12-30 13:05:50 +0800
committerGitHub <noreply@github.com>2019-12-30 13:05:50 +0800
commit70da715d8d917527291b35fb069fac077d17b868 (patch)
treeb89bad02094cc7131bc2c9f64df13e15f9de9914 /weed/command
parent93a7df500ffeed766e395907e860b1733040ff23 (diff)
parent09043c8e5a3b43add589344d28d4f57e90c83f70 (diff)
downloadseaweedfs-70da715d8d917527291b35fb069fac077d17b868.tar.xz
seaweedfs-70da715d8d917527291b35fb069fac077d17b868.zip
Merge pull request #4 from chrislusf/master
Syncing to the original repository
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go66
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/compact.go7
-rw-r--r--weed/command/export.go24
-rw-r--r--weed/command/filer_copy.go118
-rw-r--r--weed/command/fix.go17
-rw-r--r--weed/command/master.go80
-rw-r--r--weed/command/mount.go10
-rw-r--r--weed/command/mount_darwin.go13
-rw-r--r--weed/command/mount_freebsd.go13
-rw-r--r--weed/command/mount_linux.go157
-rw-r--r--weed/command/mount_notsupported.go1
-rw-r--r--weed/command/mount_std.go42
-rw-r--r--weed/command/scaffold.go56
-rw-r--r--weed/command/scaffold_test.go44
-rw-r--r--weed/command/server.go77
-rw-r--r--weed/command/shell.go17
-rw-r--r--weed/command/volume.go146
-rw-r--r--weed/command/weedfuse/README.md84
-rw-r--r--weed/command/weedfuse/weedfuse.go109
20 files changed, 629 insertions, 454 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 31e146965..0f6bed225 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -3,10 +3,12 @@ package command
import (
"fmt"
+ "github.com/spf13/viper"
+
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -17,10 +19,12 @@ var (
)
type BackupOptions struct {
- master *string
- collection *string
- dir *string
- volumeId *int
+ master *string
+ collection *string
+ dir *string
+ volumeId *int
+ ttl *string
+ replication *string
}
func init() {
@@ -29,6 +33,15 @@ func init() {
s.collection = cmdBackup.Flag.String("collection", "", "collection name")
s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files")
s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
+ s.ttl = cmdBackup.Flag.String("ttl", "", `backup volume's time to live, format:
+ 3m: 3 minutes
+ 4h: 4 hours
+ 5d: 5 days
+ 6w: 6 weeks
+ 7M: 7 months
+ 8y: 8 years
+ default is the same with origin`)
+ s.replication = cmdBackup.Flag.String("replication", "", "backup volume's replication, default is the same with origin")
}
var cmdBackup = &Command{
@@ -73,25 +86,42 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error get volume %d status: %v\n", vid, err)
return true
}
- ttl, err := needle.ReadTTL(stats.Ttl)
- if err != nil {
- fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
- return true
+ var ttl *needle.TTL
+ if *s.ttl != "" {
+ ttl, err = needle.ReadTTL(*s.ttl)
+ if err != nil {
+ fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
+ return true
+ }
+ } else {
+ ttl, err = needle.ReadTTL(stats.Ttl)
+ if err != nil {
+ fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
+ return true
+ }
}
- replication, err := storage.NewReplicaPlacementFromString(stats.Replication)
- if err != nil {
- fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
- return true
+ var replication *super_block.ReplicaPlacement
+ if *s.replication != "" {
+ replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
+ if err != nil {
+ fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
+ return true
+ }
+ } else {
+ replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
+ if err != nil {
+ fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
+ return true
+ }
}
-
- v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0)
+ v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
}
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
- if err = v.Compact(0, 0); err != nil {
+ if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true
}
@@ -100,7 +130,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
- v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0)
+ v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
}
datSize, _, _ := v.FileStat()
@@ -109,7 +139,7 @@ func runBackup(cmd *Command, args []string) bool {
// remove the old data
v.Destroy()
// recreate an empty volume
- v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0)
+ v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index dd0fdb88e..26be1fe3a 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -69,7 +69,7 @@ func init() {
}
var cmdBenchmark = &Command{
- UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
+ UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
Short: "benchmark on writing millions of files and read out",
Long: `benchmark on an empty SeaweedFS file system.
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 79d50c095..85313b749 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -17,6 +17,9 @@ var cmdCompact = &Command{
The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file.
+ For method=0, it compacts based on the .dat file, works if .idx file is corrupted.
+ For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files.
+
`,
}
@@ -38,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
vid := needle.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
- storage.NeedleMapInMemory, nil, nil, preallocate)
+ storage.NeedleMapInMemory, nil, nil, preallocate, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
@@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
- if err = v.Compact2(); err != nil {
+ if err = v.Compact2(preallocate); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
}
diff --git a/weed/command/export.go b/weed/command/export.go
index 7e94ec11c..8d664ad3b 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"fmt"
+ "io"
"os"
"path"
"path/filepath"
@@ -12,11 +13,11 @@ import (
"text/template"
"time"
- "io"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -89,12 +90,12 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version,
type VolumeFileScanner4Export struct {
version needle.Version
counter int
- needleMap *storage.NeedleMap
+ needleMap *needle_map.MemDb
vid needle.VolumeId
}
-func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -102,7 +103,7 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool {
return true
}
-func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
needleMap := scanner.needleMap
vid := scanner.vid
@@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool {
fileName = *export.collection + "_" + fileName
}
vid := needle.VolumeId(*export.volumeId)
- indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
- if err != nil {
- glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
+ needleMap := needle_map.NewMemDb()
+ if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
+ glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
volumeFileScanner := &VolumeFileScanner4Export{
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 19aceb211..f14d18c52 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -29,16 +29,17 @@ var (
)
type CopyOptions struct {
- include *string
- replication *string
- collection *string
- ttl *string
- maxMB *int
- masterClient *wdclient.MasterClient
- concurrency *int
- compressionLevel *int
- grpcDialOption grpc.DialOption
- masters []string
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ masterClient *wdclient.MasterClient
+ concurrenctFiles *int
+ concurrenctChunks *int
+ compressionLevel *int
+ grpcDialOption grpc.DialOption
+ masters []string
}
func init() {
@@ -49,7 +50,8 @@ func init() {
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
- copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
}
@@ -131,7 +133,7 @@ func runCopy(cmd *Command, args []string) bool {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
- fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
+ fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
go func() {
defer close(fileCopyTaskChan)
@@ -142,7 +144,7 @@ func runCopy(cmd *Command, args []string) bool {
}
}
}()
- for i := 0; i < *copy.concurrency; i++ {
+ for i := 0; i < *copy.concurrenctFiles; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
@@ -345,41 +347,71 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
- var chunks []*filer_pb.FileChunk
+ chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
+
+ concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
+ var wg sync.WaitGroup
+ var uploadError error
+
+ fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
+ for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
+ wg.Add(1)
+ concurrentChunks <- struct{}{}
+ go func(i int64) {
+ defer func() {
+ wg.Done()
+ <-concurrentChunks
+ }()
+ // assign a volume
+ assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ Ttl: *worker.options.ttl,
+ })
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- for i := int64(0); i < int64(chunkCount); i++ {
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- // assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
- })
- if err != nil {
- fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
- }
+ uploadResult, err := operation.Upload(targetUrl,
+ fileName+"-"+strconv.FormatInt(i+1, 10),
+ io.NewSectionReader(f, i*chunkSize, chunkSize),
+ false, "", nil, assignResult.Auth)
+ if err != nil {
+ uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ return
+ }
+ if uploadResult.Error != "" {
+ uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return
+ }
+ chunksChan <- &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ }
+ fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ }(i)
+ }
+ wg.Wait()
+ close(chunksChan)
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ var chunks []*filer_pb.FileChunk
+ for chunk := range chunksChan {
+ chunks = append(chunks, chunk)
+ }
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.LimitReader(f, chunkSize),
- false, "application/octet-stream", nil, assignResult.Auth)
- if err != nil {
- return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- }
- if uploadResult.Error != "" {
- return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ if uploadError != nil {
+ var fileIds []string
+ for _, chunk := range chunks {
+ fileIds = append(fileIds, chunk.FileId)
}
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- })
- fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
+ return uploadError
}
if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/command/fix.go b/weed/command/fix.go
index bf33490cc..76bc19f7e 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -8,6 +8,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -31,11 +33,11 @@ var (
type VolumeFileScanner4Fix struct {
version needle.Version
- nm *storage.NeedleMap
+ nm *needle_map.MemDb
}
-func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -43,14 +45,14 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
return false
}
-func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
- pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)
+ pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
- return scanner.nm.Delete(n.Id, types.ToOffset(offset))
+ return scanner.nm.Delete(n.Id)
}
return nil
}
@@ -72,8 +74,7 @@ func runFix(cmd *Command, args []string) bool {
}
defer indexFile.Close()
- nm := storage.NewBtreeNeedleMap(indexFile)
- defer nm.Close()
+ nm := needle_map.NewMemDb()
vid := needle.VolumeId(*fixVolumeId)
scanner := &VolumeFileScanner4Fix{
diff --git a/weed/command/master.go b/weed/command/master.go
index 9e9308468..8d0a3289c 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/spf13/viper"
@@ -72,8 +73,6 @@ var cmdMaster = &Command{
var (
masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")
masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file")
-
- masterWhiteList []string
)
func runMaster(cmd *Command, args []string) bool {
@@ -87,6 +86,8 @@ func runMaster(cmd *Command, args []string) bool {
if err := util.TestFolderWritable(*m.metaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
}
+
+ var masterWhiteList []string
if *m.whiteList != "" {
masterWhiteList = strings.Split(*m.whiteList, ",")
}
@@ -94,52 +95,54 @@ func runMaster(cmd *Command, args []string) bool {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
- r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, m.toMasterOption(masterWhiteList))
+ startMaster(m, masterWhiteList)
+
+ return true
+}
- listeningAddress := *m.ipBind + ":" + strconv.Itoa(*m.port)
+func startMaster(masterOption MasterOptions, masterWhiteList []string) {
- glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
+ backend.LoadConfiguration(viper.GetViper())
+ myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
+
+ r := mux.NewRouter()
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
+ listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
+ glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
+ // start raftServer
+ raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
+ peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds)
+ if raftServer == nil {
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
+ }
+ ms.SetRaftServer(raftServer)
+ r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
+ // starting grpc server
+ grpcPort := *masterOption.port + 10000
+ grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0)
+ if err != nil {
+ glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
+ }
+ // Create your protocol servers.
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
+ master_pb.RegisterSeaweedServer(grpcS, ms)
+ protobuf.RegisterRaftServer(grpcS, raftServer)
+ reflection.Register(grpcS)
+ glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort)
+ go grpcS.Serve(grpcL)
- go func() {
- // start raftServer
- myMasterAddress, peers := checkPeers(*m.ip, *m.port, *m.peers)
- raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
- peers, myMasterAddress, *m.metaFolder, ms.Topo, *m.pulseSeconds)
- if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *m.metaFolder)
- }
- ms.SetRaftServer(raftServer)
- r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
-
- // starting grpc server
- grpcPort := *m.port + 10000
- grpcL, err := util.NewListener(*m.ipBind+":"+strconv.Itoa(grpcPort), 0)
- if err != nil {
- glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
- }
- // Create your protocol servers.
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
- master_pb.RegisterSeaweedServer(grpcS, ms)
- protobuf.RegisterRaftServer(grpcS, raftServer)
- reflection.Register(grpcS)
-
- glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *m.ipBind, grpcPort)
- grpcS.Serve(grpcL)
- }()
+ go ms.MasterClient.KeepConnectedToMaster()
// start http server
httpS := &http.Server{Handler: r}
- if err := httpS.Serve(masterListener); err != nil {
- glog.Fatalf("master server failed to serve: %v", err)
- }
+ go httpS.Serve(masterListener)
- return true
+ select {}
}
func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
@@ -156,11 +159,10 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
}
}
- peerCount := len(cleanedPeers)
if !hasSelf {
- peerCount += 1
+ cleanedPeers = append(cleanedPeers, masterAddress)
}
- if peerCount%2 == 0 {
+ if len(cleanedPeers)%2 == 0 {
glog.Fatalf("Only odd number of masters are supported!")
}
return
diff --git a/weed/command/mount.go b/weed/command/mount.go
index ec790c999..f09b285f7 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -10,13 +10,14 @@ type MountOptions struct {
filer *string
filerMountRootPath *string
dir *string
- dirListingLimit *int
+ dirListCacheLimit *int64
collection *string
replication *string
ttlSec *int
chunkSizeLimitMB *int
dataCenter *string
allowOthers *bool
+ umaskString *string
}
var (
@@ -30,13 +31,14 @@ func init() {
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
- mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size")
+ mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
+ mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
}
@@ -62,12 +64,12 @@ var cmdMount = &Command{
func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
hostnameAndPort := strings.Split(filer, ":")
if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort)
+ return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
}
filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
if parseErr != nil {
- return "", fmt.Errorf("The filer filer port parse error: %v", parseErr)
+ return "", fmt.Errorf("filer port parse error: %v", parseErr)
}
filerGrpcPort := int(filerPort) + 10000
diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go
new file mode 100644
index 000000000..f0a5581e7
--- /dev/null
+++ b/weed/command/mount_darwin.go
@@ -0,0 +1,13 @@
+package command
+
+import (
+ "github.com/seaweedfs/fuse"
+)
+
+func osSpecificMountOptions() []fuse.MountOption {
+ return []fuse.MountOption{}
+}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go
new file mode 100644
index 000000000..f0a5581e7
--- /dev/null
+++ b/weed/command/mount_freebsd.go
@@ -0,0 +1,13 @@
+package command
+
+import (
+ "github.com/seaweedfs/fuse"
+)
+
+func osSpecificMountOptions() []fuse.MountOption {
+ return []fuse.MountOption{}
+}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
new file mode 100644
index 000000000..80a5f9da4
--- /dev/null
+++ b/weed/command/mount_linux.go
@@ -0,0 +1,157 @@
+package command
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+
+ "github.com/seaweedfs/fuse"
+)
+
+const (
+ /* 36 35 98:0 /mnt1 /mnt2 rw,noatime master:1 - ext3 /dev/root rw,errors=continue
+ (1)(2)(3) (4) (5) (6) (7) (8) (9) (10) (11)
+
+ (1) mount ID: unique identifier of the mount (may be reused after umount)
+ (2) parent ID: ID of parent (or of self for the top of the mount tree)
+ (3) major:minor: value of st_dev for files on filesystem
+ (4) root: root of the mount within the filesystem
+ (5) mount point: mount point relative to the process's root
+ (6) mount options: per mount options
+ (7) optional fields: zero or more fields of the form "tag[:value]"
+ (8) separator: marks the end of the optional fields
+ (9) filesystem type: name of filesystem of the form "type[.subtype]"
+ (10) mount source: filesystem specific information or "none"
+ (11) super options: per super block options*/
+ mountinfoFormat = "%d %d %d:%d %s %s %s %s"
+)
+
+// Info reveals information about a particular mounted filesystem. This
+// struct is populated from the content in the /proc/<pid>/mountinfo file.
+type Info struct {
+ // ID is a unique identifier of the mount (may be reused after umount).
+ ID int
+
+ // Parent indicates the ID of the mount parent (or of self for the top of the
+ // mount tree).
+ Parent int
+
+ // Major indicates one half of the device ID which identifies the device class.
+ Major int
+
+ // Minor indicates one half of the device ID which identifies a specific
+ // instance of device.
+ Minor int
+
+ // Root of the mount within the filesystem.
+ Root string
+
+ // Mountpoint indicates the mount point relative to the process's root.
+ Mountpoint string
+
+ // Opts represents mount-specific options.
+ Opts string
+
+ // Optional represents optional fields.
+ Optional string
+
+ // Fstype indicates the type of filesystem, such as EXT3.
+ Fstype string
+
+ // Source indicates filesystem specific information or "none".
+ Source string
+
+ // VfsOpts represents per super block options.
+ VfsOpts string
+}
+
+// Mounted determines if a specified mountpoint has been mounted.
+// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab.
+func mounted(mountPoint string) (bool, error) {
+ entries, err := parseMountTable()
+ if err != nil {
+ return false, err
+ }
+
+ // Search the table for the mountPoint
+ for _, e := range entries {
+ if e.Mountpoint == mountPoint {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+// Parse /proc/self/mountinfo because comparing Dev and ino does not work from
+// bind mounts
+func parseMountTable() ([]*Info, error) {
+ f, err := os.Open("/proc/self/mountinfo")
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ return parseInfoFile(f)
+}
+
+func parseInfoFile(r io.Reader) ([]*Info, error) {
+ var (
+ s = bufio.NewScanner(r)
+ out []*Info
+ )
+
+ for s.Scan() {
+ if err := s.Err(); err != nil {
+ return nil, err
+ }
+
+ var (
+ p = &Info{}
+ text = s.Text()
+ optionalFields string
+ )
+
+ if _, err := fmt.Sscanf(text, mountinfoFormat,
+ &p.ID, &p.Parent, &p.Major, &p.Minor,
+ &p.Root, &p.Mountpoint, &p.Opts, &optionalFields); err != nil {
+ return nil, fmt.Errorf("Scanning '%s' failed: %s", text, err)
+ }
+ // Safe as mountinfo encodes mountpoints with spaces as \040.
+ index := strings.Index(text, " - ")
+ postSeparatorFields := strings.Fields(text[index+3:])
+ if len(postSeparatorFields) < 3 {
+ return nil, fmt.Errorf("Error found less than 3 fields post '-' in %q", text)
+ }
+
+ if optionalFields != "-" {
+ p.Optional = optionalFields
+ }
+
+ p.Fstype = postSeparatorFields[0]
+ p.Source = postSeparatorFields[1]
+ p.VfsOpts = strings.Join(postSeparatorFields[2:], " ")
+ out = append(out, p)
+ }
+ return out, nil
+}
+
+func osSpecificMountOptions() []fuse.MountOption {
+ return []fuse.MountOption{
+ fuse.AllowNonEmptyMount(),
+ }
+}
+
+func checkMountPointAvailable(dir string) bool {
+ mountPoint := dir
+ if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {
+ mountPoint = mountPoint[0 : len(mountPoint)-1]
+ }
+
+ if mounted, err := mounted(mountPoint); err != nil || mounted {
+ return false
+ }
+
+ return true
+}
diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go
index 3bf22ddc4..f3c0de3d6 100644
--- a/weed/command/mount_notsupported.go
+++ b/weed/command/mount_notsupported.go
@@ -1,5 +1,6 @@
// +build !linux
// +build !darwin
+// +build !freebsd
package command
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 1d1214266..453531d00 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -1,4 +1,4 @@
-// +build linux darwin
+// +build linux darwin freebsd
package command
@@ -12,12 +12,12 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/jacobsa/daemonize"
"github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -27,6 +27,12 @@ func runMount(cmd *Command, args []string) bool {
util.SetupProfiling(*mountCpuProfile, *mountMemProfile)
+ umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
+ if umaskErr != nil {
+ fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
+ return false
+ }
+
return RunMount(
*mountOptions.filer,
*mountOptions.filerMountRootPath,
@@ -37,12 +43,13 @@ func runMount(cmd *Command, args []string) bool {
*mountOptions.chunkSizeLimitMB,
*mountOptions.allowOthers,
*mountOptions.ttlSec,
- *mountOptions.dirListingLimit,
+ *mountOptions.dirListCacheLimit,
+ os.FileMode(umask),
)
}
func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
- allowOthers bool, ttlSec int, dirListingLimit int) bool {
+ allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool {
util.LoadConfiguration("security", false)
@@ -81,12 +88,18 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
}
}
+ // Ensure target mount point availability
+ if isValid := checkMountPointAvailable(dir); !isValid {
+ glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
+ return false
+ }
+
mountName := path.Base(dir)
options := []fuse.MountOption{
fuse.VolumeName(mountName),
- fuse.FSName("SeaweedFS"),
- fuse.Subtype("SeaweedFS"),
+ fuse.FSName(filer + ":" + filerMountRootPath),
+ fuse.Subtype("seaweedfs"),
fuse.NoAppleDouble(),
fuse.NoAppleXattr(),
fuse.NoBrowse(),
@@ -100,15 +113,18 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
fuse.WritebackCache(),
fuse.AllowNonEmptyMount(),
}
+
+ options = append(options, osSpecificMountOptions()...)
+
if allowOthers {
options = append(options, fuse.AllowOther())
}
c, err := fuse.Mount(dir, options...)
if err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount: %v", err)
daemonize.SignalOutcome(err)
- return false
+ return true
}
util.OnInterrupt(func() {
@@ -118,9 +134,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
filerGrpcAddress, err := parseFilerGrpcAddress(filer)
if err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("parseFilerGrpcAddress: %v", err)
daemonize.SignalOutcome(err)
- return false
+ return true
}
mountRoot := filerMountRootPath
@@ -139,13 +155,14 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
TtlSec: int32(ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
DataCenter: dataCenter,
- DirListingLimit: dirListingLimit,
+ DirListCacheLimit: dirListCacheLimit,
EntryCacheTtl: 3 * time.Second,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
+ Umask: umask,
}))
if err != nil {
fuse.Unmount(dir)
@@ -154,8 +171,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount process: %v", err)
daemonize.SignalOutcome(err)
+ return true
}
return true
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 062fe0ff8..a76466ed6 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -59,15 +59,6 @@ const (
# $HOME/.seaweedfs/filer.toml
# /etc/seaweedfs/filer.toml
-[memory]
-# local in memory, mostly for testing purpose
-enabled = false
-
-[leveldb]
-# local on disk, mostly for simple single-machine setup, fairly scalable
-enabled = false
-dir = "." # directory to store level db files
-
[leveldb2]
# local on disk, mostly for simple single-machine setup, fairly scalable
# faster than previous leveldb, recommended.
@@ -78,7 +69,7 @@ dir = "." # directory to store level db files
# multiple filers on shared storage, fairly scalable
####################################################
-[mysql]
+[mysql] # or tidb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
# name VARCHAR(1000) COMMENT 'directory or file name',
@@ -95,8 +86,9 @@ password = ""
database = "" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
+interpolateParams = false
-[postgres]
+[postgres] # or cockroachdb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT,
# name VARCHAR(65535),
@@ -131,7 +123,7 @@ hosts=[
enabled = false
address = "localhost:6379"
password = ""
-db = 0
+database = 0
[redis_cluster]
enabled = false
@@ -144,6 +136,20 @@ addresses = [
"localhost:30006",
]
password = ""
+// allows reads from slave servers or the master, but all writes still go to the master
+readOnly = true
+// automatically use the closest Redis server for reads
+routeByLatency = true
+
+[etcd]
+enabled = false
+servers = "localhost:2379"
+timeout = "3s"
+
+[tikv]
+enabled = false
+pdAddress = "192.168.199.113:2379"
+
`
@@ -217,22 +223,22 @@ grpcAddress = "localhost:18888"
# all files under this directory tree are replicated.
# this is not a directory on your hard drive, but on your filer.
# i.e., all files with this "prefix" are sent to notification message queue.
-directory = "/buckets"
+directory = "/buckets"
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
# all replicated files are under this directory tree
-# this is not a directory on your hard drive, but on your filer.
+# this is not a directory on your hard drive, but on your filer.
# i.e., all received files will be "prefixed" to this directory.
-directory = "/backup"
+directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
-# default loads credentials from the shared credentials file (~/.aws/credentials).
+# default loads credentials from the shared credentials file (~/.aws/credentials).
enabled = false
aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
@@ -336,5 +342,23 @@ scripts = """
"""
sleep_minutes = 17 # sleep minutes between each script execution
+[master.filer]
+default_filer_url = "http://localhost:8888/"
+
+[master.sequencer]
+type = "memory" # Choose [memory|etcd] type for storing the file id sequence
+# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
+# example : http://127.0.0.1:2379,http://127.0.0.1:2389
+sequencer_etcd_urls = "http://127.0.0.1:2379"
+
+
+[storage.backend]
+ [storage.backend.s3.default]
+ enabled = false
+ aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ region = "us-east-2"
+ bucket = "your_bucket_name" # an existing bucket
+
`
)
diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go
new file mode 100644
index 000000000..423dacc32
--- /dev/null
+++ b/weed/command/scaffold_test.go
@@ -0,0 +1,44 @@
+package command
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/spf13/viper"
+)
+
+func TestReadingTomlConfiguration(t *testing.T) {
+
+ viper.SetConfigType("toml")
+
+ // any approach to require this configuration into your program.
+ var tomlExample = []byte(`
+[database]
+server = "192.168.1.1"
+ports = [ 8001, 8001, 8002 ]
+connection_max = 5000
+enabled = true
+
+[servers]
+
+ # You can indent as you please. Tabs or spaces. TOML don't care.
+ [servers.alpha]
+ ip = "10.0.0.1"
+ dc = "eqdc10"
+
+ [servers.beta]
+ ip = "10.0.0.2"
+ dc = "eqdc10"
+
+`)
+
+ viper.ReadConfig(bytes.NewBuffer(tomlExample))
+
+ fmt.Printf("database is %v\n", viper.Get("database"))
+ fmt.Printf("servers is %v\n", viper.GetStringMap("servers"))
+
+ alpha := viper.Sub("servers.alpha")
+
+ fmt.Printf("alpha ip is %v\n", alpha.GetString("ip"))
+}
diff --git a/weed/command/server.go b/weed/command/server.go
index f8c1d06fc..87f404ed3 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,25 +2,14 @@ package command
import (
"fmt"
- "net/http"
"os"
"runtime"
"runtime/pprof"
- "strconv"
"strings"
- "sync"
"time"
- "github.com/chrislusf/raft/protobuf"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
-
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
)
type ServerOptions struct {
@@ -132,14 +121,17 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
- master := *serverIp + ":" + strconv.Itoa(*masterOptions.port)
+ _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
+ peers := strings.Join(peerList, ",")
+ masterOptions.peers = &peers
+
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = &master
+ filerOptions.masters = &peers
filerOptions.ip = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
- serverOptions.v.masters = &master
+ serverOptions.v.masters = &peers
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
@@ -198,59 +190,12 @@ func runServer(cmd *Command, args []string) bool {
}()
}
- var volumeWait sync.WaitGroup
-
- volumeWait.Add(1)
-
- go func() {
- r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, masterOptions.toMasterOption(serverWhiteList))
-
- glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterOptions.port)
- masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterOptions.port), 0)
- if e != nil {
- glog.Fatalf("Master startup error: %v", e)
- }
-
- go func() {
- // start raftServer
- myMasterAddress, peers := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
- raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
- peers, myMasterAddress, *masterOptions.metaFolder, ms.Topo, *masterOptions.pulseSeconds)
- ms.SetRaftServer(raftServer)
- r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
-
- // starting grpc server
- grpcPort := *masterOptions.port + 10000
- grpcL, err := util.NewListener(*serverBindIp+":"+strconv.Itoa(grpcPort), 0)
- if err != nil {
- glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
- }
- // Create your protocol servers.
- glog.V(1).Infof("grpc config %+v", viper.Sub("grpc"))
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
- master_pb.RegisterSeaweedServer(grpcS, ms)
- protobuf.RegisterRaftServer(grpcS, raftServer)
- reflection.Register(grpcS)
-
- glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort)
- grpcS.Serve(grpcL)
- }()
-
- volumeWait.Done()
-
- // start http server
- httpS := &http.Server{Handler: r}
- if err := httpS.Serve(masterListener); err != nil {
- glog.Fatalf("master server failed to serve: %v", err)
- }
-
- }()
-
- volumeWait.Wait()
- time.Sleep(100 * time.Millisecond)
+ // start volume server
+ {
+ go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
+ }
- serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
+ startMaster(masterOptions, serverWhiteList)
return true
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 79f8b8bf9..34b5aef31 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -1,6 +1,8 @@
package command
import (
+ "fmt"
+
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -8,12 +10,14 @@ import (
)
var (
- shellOptions shell.ShellOptions
+ shellOptions shell.ShellOptions
+ shellInitialFilerUrl *string
)
func init() {
cmdShell.Run = runShell // break init cycle
shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ shellInitialFilerUrl = cmdShell.Flag.String("filer.url", "http://localhost:8888/", "initial filer url")
}
var cmdShell = &Command{
@@ -24,16 +28,17 @@ var cmdShell = &Command{
`,
}
-var ()
-
func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
- shellOptions.FilerHost = "localhost"
- shellOptions.FilerPort = 8888
- shellOptions.Directory = "/"
+ var filerPwdErr error
+ shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl)
+ if filerPwdErr != nil {
+ fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr)
+ return false
+ }
shell.RunShell(shellOptions)
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 3c1aa2b50..3e8341ef8 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"os"
"runtime"
@@ -10,7 +11,9 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util/httpdown"
"github.com/spf13/viper"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -94,7 +97,7 @@ func runVolume(cmd *Command, args []string) bool {
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
- //Set multiple folders and each folder's max volume count limit'
+ // Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
@@ -113,7 +116,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
- //security related white list configuration
+ // security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
}
@@ -128,11 +131,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if *v.publicUrl == "" {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
}
- isSeperatedPublicPort := *v.publicPort != *v.port
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
- if isSeperatedPublicPort {
+ if v.isSeparatedPublicPort() {
publicVolumeMux = http.NewServeMux()
}
@@ -158,51 +160,131 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond,
)
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
- glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
- listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
- }
- if isSeperatedPublicPort {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
+ // starting grpc server
+ grpcS := v.startGrpcService(volumeServer)
+
+ // starting public http server
+ var publicHttpDown httpdown.Server
+ if v.isSeparatedPublicPort() {
+ publicHttpDown = v.startPublicHttpService(publicVolumeMux)
+ if nil == publicHttpDown {
+ glog.Fatalf("start public http service failed")
}
- go func() {
- if e := http.Serve(publicListener, publicVolumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve public: %v", e)
- }
- }()
}
+ // starting the cluster http server
+ clusterHttpServer := v.startClusterHttpService(volumeMux)
+
+ stopChain := make(chan struct{})
util.OnInterrupt(func() {
+ fmt.Println("volume server has be killed")
+ var startTime time.Time
+
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ startTime = time.Now()
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop public http server, elapsed %dms", delta)
+ }
+
+ startTime = time.Now()
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
+
+ startTime = time.Now()
+ grpcS.GracefulStop()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+
+ startTime = time.Now()
volumeServer.Shutdown()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+
pprof.StopCPUProfile()
+
+ close(stopChain) // notify exit
})
- // starting grpc server
+ select {
+ case <-stopChain:
+ }
+ glog.Warningf("the volume server exit.")
+}
+
+// check whether configure the public port
+func (v VolumeServerOptions) isSeparatedPublicPort() bool {
+ return *v.publicPort != *v.port
+}
+
+func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
grpcPort := *v.port + 10000
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume"))
- volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
+ volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
-
- if viper.GetString("https.volume.key") != "" {
- if e := http.ServeTLS(listener, volumeMux,
- viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ go func() {
+ if err := grpcS.Serve(grpcL); err != nil {
+ glog.Fatalf("start gRPC service failed, %s", err)
}
- } else {
- if e := http.Serve(listener, volumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ }()
+ return grpcS
+}
+
+func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
+ publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
+ publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute}
+ publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener)
+ go func() {
+ if err := publicHttpDown.Wait(); err != nil {
+ glog.Errorf("public http down wait failed, %v", err)
}
+ }()
+
+ return publicHttpDown
+}
+
+func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server {
+ var (
+ certFile, keyFile string
+ )
+ if viper.GetString("https.volume.key") != "" {
+ certFile = viper.GetString("https.volume.cert")
+ keyFile = viper.GetString("https.volume.key")
+ }
+
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
+ listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
}
+ httpDown := httpdown.HTTP{
+ KillTimeout: 5 * time.Minute,
+ StopTimeout: 5 * time.Minute,
+ CertFile: certFile,
+ KeyFile: keyFile}
+ clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ go func() {
+ if e := clusterHttpServer.Wait(); e != nil {
+ glog.Fatalf("Volume server fail to serve: %v", e)
+ }
+ }()
+ return clusterHttpServer
}
diff --git a/weed/command/weedfuse/README.md b/weed/command/weedfuse/README.md
deleted file mode 100644
index 1a1496bbb..000000000
--- a/weed/command/weedfuse/README.md
+++ /dev/null
@@ -1,84 +0,0 @@
-Mount the SeaweedFS via FUSE
-
-# Mount by fstab
-
-
-```
-$ # on linux
-$ sudo apt-get install fuse
-$ sudo echo 'user_allow_other' >> /etc/fuse.conf
-$ sudo mv weedfuse /sbin/mount.weedfuse
-
-$ # on Mac
-$ sudo mv weedfuse /sbin/mount_weedfuse
-
-```
-
-On both OS X and Linux, you can add one of the entries to your /etc/fstab file like the following:
-
-```
-# mount the whole SeaweedFS
-localhost:8888/ /home/some/mount/folder weedfuse
-
-# mount the SeaweedFS sub folder
-localhost:8888/sub/dir /home/some/mount/folder weedfuse
-
-# mount the SeaweedFS sub folder with some options
-localhost:8888/sub/dir /home/some/mount/folder weedfuse user
-
-```
-
-To verify it can work, try this command
-```
-$ sudo mount -av
-
-...
-
-/home/some/mount/folder : successfully mounted
-
-```
-
-If you see `successfully mounted`, try to access the mounted folder and verify everything works.
-
-
-To debug, run these:
-```
-
-$ weedfuse -foreground localhost:8888/ /home/some/mount/folder
-
-```
-
-
-To unmount the folder:
-```
-
-$ sudo umount /home/some/mount/folder
-
-```
-
-<!-- not working yet!
-
-# Mount by autofs
-
-AutoFS can mount a folder if accessed.
-
-```
-# install autofs
-$ sudo apt-get install autofs
-```
-
-Here is an example on how to mount a folder for all users under `/home` directory.
-Assuming there exists corresponding folders under `/home` on both local and SeaweedFS.
-
-Edit `/etc/auto.master` and `/etc/auto.weedfuse` file with these content
-```
-$ cat /etc/auto.master
-/home /etc/auto.weedfuse
-
-$ cat /etc/auto.weedfuse
-# map /home/<user> to localhost:8888/home/<user>
-* -fstype=weedfuse,rw,allow_other,foreground :localhost\:8888/home/&
-
-```
-
--->
diff --git a/weed/command/weedfuse/weedfuse.go b/weed/command/weedfuse/weedfuse.go
deleted file mode 100644
index 4c0d12874..000000000
--- a/weed/command/weedfuse/weedfuse.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package main
-
-import (
- "flag"
- "fmt"
- "os"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/command"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/jacobsa/daemonize"
- "github.com/kardianos/osext"
-)
-
-var (
- fuseCommand = flag.NewFlagSet("weedfuse", flag.ContinueOnError)
- options = fuseCommand.String("o", "", "comma separated options rw,uid=xxx,gid=xxx")
- isForeground = fuseCommand.Bool("foreground", false, "starts as a daemon")
-)
-
-func main() {
-
- err := fuseCommand.Parse(os.Args[1:])
- if err != nil {
- glog.Fatal(err)
- }
- fmt.Printf("options: %v\n", *options)
-
- // seems this value is always empty, need to parse it differently
- optionsString := *options
- prev := ""
- for i, arg := range os.Args {
- fmt.Printf("args[%d]: %v\n", i, arg)
- if prev == "-o" {
- optionsString = arg
- }
- prev = arg
- }
-
- device := fuseCommand.Arg(0)
- mountPoint := fuseCommand.Arg(1)
-
- fmt.Printf("source: %v\n", device)
- fmt.Printf("target: %v\n", mountPoint)
-
- nouser := true
- for _, option := range strings.Split(optionsString, ",") {
- fmt.Printf("option: %v\n", option)
- switch option {
- case "user":
- nouser = false
- }
- }
-
- maybeSetupPath()
-
- if !*isForeground {
- startAsDaemon()
- return
- }
-
- parts := strings.SplitN(device, "/", 2)
- filer, filerPath := parts[0], parts[1]
-
- command.RunMount(
- filer, "/"+filerPath, mountPoint, "", "000", "",
- 4, !nouser, 0, 1000000)
-
-}
-
-func maybeSetupPath() {
- // sudo mount -av may not include PATH in some linux, e.g., Ubuntu
- hasPathEnv := false
- for _, e := range os.Environ() {
- if strings.HasPrefix(e, "PATH=") {
- hasPathEnv = true
- }
- fmt.Println(e)
- }
- if !hasPathEnv {
- os.Setenv("PATH", "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
- }
-}
-
-func startAsDaemon() {
-
- // adapted from gcsfuse
-
- // Find the executable.
- var path string
- path, err := osext.Executable()
- if err != nil {
- glog.Fatalf("osext.Executable: %v", err)
- }
-
- // Set up arguments. Be sure to use foreground mode.
- args := append([]string{"-foreground"}, os.Args[1:]...)
-
- // Pass along PATH so that the daemon can find fusermount on Linux.
- env := []string{
- fmt.Sprintf("PATH=%s", os.Getenv("PATH")),
- }
-
- err = daemonize.Run(path, args, env, os.Stdout)
- if err != nil {
- glog.Fatalf("daemonize.Run: %v", err)
- }
-
-}