diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-12-30 13:05:50 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-12-30 13:05:50 +0800 |
| commit | 70da715d8d917527291b35fb069fac077d17b868 (patch) | |
| tree | b89bad02094cc7131bc2c9f64df13e15f9de9914 /weed/command | |
| parent | 93a7df500ffeed766e395907e860b1733040ff23 (diff) | |
| parent | 09043c8e5a3b43add589344d28d4f57e90c83f70 (diff) | |
| download | seaweedfs-70da715d8d917527291b35fb069fac077d17b868.tar.xz seaweedfs-70da715d8d917527291b35fb069fac077d17b868.zip | |
Merge pull request #4 from chrislusf/master
Syncing to the original repository
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/backup.go | 66 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 2 | ||||
| -rw-r--r-- | weed/command/compact.go | 7 | ||||
| -rw-r--r-- | weed/command/export.go | 24 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 118 | ||||
| -rw-r--r-- | weed/command/fix.go | 17 | ||||
| -rw-r--r-- | weed/command/master.go | 80 | ||||
| -rw-r--r-- | weed/command/mount.go | 10 | ||||
| -rw-r--r-- | weed/command/mount_darwin.go | 13 | ||||
| -rw-r--r-- | weed/command/mount_freebsd.go | 13 | ||||
| -rw-r--r-- | weed/command/mount_linux.go | 157 | ||||
| -rw-r--r-- | weed/command/mount_notsupported.go | 1 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 42 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 56 | ||||
| -rw-r--r-- | weed/command/scaffold_test.go | 44 | ||||
| -rw-r--r-- | weed/command/server.go | 77 | ||||
| -rw-r--r-- | weed/command/shell.go | 17 | ||||
| -rw-r--r-- | weed/command/volume.go | 146 | ||||
| -rw-r--r-- | weed/command/weedfuse/README.md | 84 | ||||
| -rw-r--r-- | weed/command/weedfuse/weedfuse.go | 109 |
20 files changed, 629 insertions, 454 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index 31e146965..0f6bed225 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -3,10 +3,12 @@ package command import ( "fmt" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" @@ -17,10 +19,12 @@ var ( ) type BackupOptions struct { - master *string - collection *string - dir *string - volumeId *int + master *string + collection *string + dir *string + volumeId *int + ttl *string + replication *string } func init() { @@ -29,6 +33,15 @@ func init() { s.collection = cmdBackup.Flag.String("collection", "", "collection name") s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files") s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.") + s.ttl = cmdBackup.Flag.String("ttl", "", `backup volume's time to live, format: + 3m: 3 minutes + 4h: 4 hours + 5d: 5 days + 6w: 6 weeks + 7M: 7 months + 8y: 8 years + default is the same with origin`) + s.replication = cmdBackup.Flag.String("replication", "", "backup volume's replication, default is the same with origin") } var cmdBackup = &Command{ @@ -73,25 +86,42 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error get volume %d status: %v\n", vid, err) return true } - ttl, err := needle.ReadTTL(stats.Ttl) - if err != nil { - fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) - return true + var ttl *needle.TTL + if *s.ttl != "" { + ttl, err = needle.ReadTTL(*s.ttl) + if err != nil { + fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err) + return true + } + } else { + ttl, err = needle.ReadTTL(stats.Ttl) + if err != nil { + fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) + return true + } } - replication, err := storage.NewReplicaPlacementFromString(stats.Replication) - if err != nil { - fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) - return true + var replication *super_block.ReplicaPlacement + if *s.replication != "" { + replication, err = super_block.NewReplicaPlacementFromString(*s.replication) + if err != nil { + fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err) + return true + } + } else { + replication, err = super_block.NewReplicaPlacementFromString(stats.Replication) + if err != nil { + fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) + return true + } } - - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact(0, 0); err != nil { + if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } @@ -100,7 +130,7 @@ func runBackup(cmd *Command, args []string) bool { return true } v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) - v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0) + v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) } datSize, _, _ := v.FileStat() @@ -109,7 +139,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index dd0fdb88e..26be1fe3a 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -69,7 +69,7 @@ func init() { } var cmdBenchmark = &Command{ - UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000", + UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000", Short: "benchmark on writing millions of files and read out", Long: `benchmark on an empty SeaweedFS file system. diff --git a/weed/command/compact.go b/weed/command/compact.go index 79d50c095..85313b749 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -17,6 +17,9 @@ var cmdCompact = &Command{ The compacted .dat file is stored as .cpd file. The compacted .idx file is stored as .cpx file. + For method=0, it compacts based on the .dat file, works if .idx file is corrupted. + For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files. + `, } @@ -38,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool { vid := needle.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil, preallocate) + storage.NeedleMapInMemory, nil, nil, preallocate, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } @@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { - if err = v.Compact2(); err != nil { + if err = v.Compact2(preallocate); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } diff --git a/weed/command/export.go b/weed/command/export.go index 7e94ec11c..8d664ad3b 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -4,6 +4,7 @@ import ( "archive/tar" "bytes" "fmt" + "io" "os" "path" "path/filepath" @@ -12,11 +13,11 @@ import ( "text/template" "time" - "io" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -89,12 +90,12 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, type VolumeFileScanner4Export struct { version needle.Version counter int - needleMap *storage.NeedleMap + needleMap *needle_map.MemDb vid needle.VolumeId } -func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version return nil } @@ -102,7 +103,7 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { needleMap := scanner.needleMap vid := scanner.vid @@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool { fileName = *export.collection + "_" + fileName } vid := needle.VolumeId(*export.volumeId) - indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - needleMap, err := storage.LoadBtreeNeedleMap(indexFile) - if err != nil { - glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) + needleMap := needle_map.NewMemDb() + if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil { + glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err) } volumeFileScanner := &VolumeFileScanner4Export{ diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 19aceb211..f14d18c52 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -29,16 +29,17 @@ var ( ) type CopyOptions struct { - include *string - replication *string - collection *string - ttl *string - maxMB *int - masterClient *wdclient.MasterClient - concurrency *int - compressionLevel *int - grpcDialOption grpc.DialOption - masters []string + include *string + replication *string + collection *string + ttl *string + maxMB *int + masterClient *wdclient.MasterClient + concurrenctFiles *int + concurrenctChunks *int + compressionLevel *int + grpcDialOption grpc.DialOption + masters []string } func init() { @@ -49,7 +50,8 @@ func init() { copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") - copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") } @@ -131,7 +133,7 @@ func runCopy(cmd *Command, args []string) bool { util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } - fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency) + fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles) go func() { defer close(fileCopyTaskChan) @@ -142,7 +144,7 @@ func runCopy(cmd *Command, args []string) bool { } } }() - for i := 0; i < *copy.concurrency; i++ { + for i := 0; i < *copy.concurrenctFiles; i++ { waitGroup.Add(1) go func() { defer waitGroup.Done() @@ -345,41 +347,71 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) - var chunks []*filer_pb.FileChunk + chunksChan := make(chan *filer_pb.FileChunk, chunkCount) + + concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks) + var wg sync.WaitGroup + var uploadError error + + fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount) + for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ { + wg.Add(1) + concurrentChunks <- struct{}{} + go func(i int64) { + defer func() { + wg.Done() + <-concurrentChunks + }() + // assign a volume + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, + }) + if err != nil { + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + } - for i := int64(0); i < int64(chunkCount); i++ { + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - // assign a volume - assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - Ttl: *worker.options.ttl, - }) - if err != nil { - fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) - } + uploadResult, err := operation.Upload(targetUrl, + fileName+"-"+strconv.FormatInt(i+1, 10), + io.NewSectionReader(f, i*chunkSize, chunkSize), + false, "", nil, assignResult.Auth) + if err != nil { + uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + return + } + if uploadResult.Error != "" { + uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return + } + chunksChan <- &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: i * chunkSize, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + } + fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + }(i) + } + wg.Wait() + close(chunksChan) - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + var chunks []*filer_pb.FileChunk + for chunk := range chunksChan { + chunks = append(chunks, chunk) + } - uploadResult, err := operation.Upload(targetUrl, - fileName+"-"+strconv.FormatInt(i+1, 10), - io.LimitReader(f, chunkSize), - false, "application/octet-stream", nil, assignResult.Auth) - if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + if uploadError != nil { + var fileIds []string + for _, chunk := range chunks { + fileIds = append(fileIds, chunk.FileId) } - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: i * chunkSize, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }) - fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds) + return uploadError } if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/command/fix.go b/weed/command/fix.go index bf33490cc..76bc19f7e 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -8,6 +8,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -31,11 +33,11 @@ var ( type VolumeFileScanner4Fix struct { version needle.Version - nm *storage.NeedleMap + nm *needle_map.MemDb } -func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock storage.SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version return nil } @@ -43,14 +45,14 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { return false } -func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) if n.Size > 0 && n.Size != types.TombstoneFileSize { - pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) + pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size) glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id, types.ToOffset(offset)) + return scanner.nm.Delete(n.Id) } return nil } @@ -72,8 +74,7 @@ func runFix(cmd *Command, args []string) bool { } defer indexFile.Close() - nm := storage.NewBtreeNeedleMap(indexFile) - defer nm.Close() + nm := needle_map.NewMemDb() vid := needle.VolumeId(*fixVolumeId) scanner := &VolumeFileScanner4Fix{ diff --git a/weed/command/master.go b/weed/command/master.go index 9e9308468..8d0a3289c 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" "github.com/spf13/viper" @@ -72,8 +73,6 @@ var cmdMaster = &Command{ var ( masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file") - - masterWhiteList []string ) func runMaster(cmd *Command, args []string) bool { @@ -87,6 +86,8 @@ func runMaster(cmd *Command, args []string) bool { if err := util.TestFolderWritable(*m.metaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err) } + + var masterWhiteList []string if *m.whiteList != "" { masterWhiteList = strings.Split(*m.whiteList, ",") } @@ -94,52 +95,54 @@ func runMaster(cmd *Command, args []string) bool { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } - r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, m.toMasterOption(masterWhiteList)) + startMaster(m, masterWhiteList) + + return true +} - listeningAddress := *m.ipBind + ":" + strconv.Itoa(*m.port) +func startMaster(masterOption MasterOptions, masterWhiteList []string) { - glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) + backend.LoadConfiguration(viper.GetViper()) + myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) + + r := mux.NewRouter() + ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) + listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port) + glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress) masterListener, e := util.NewListener(listeningAddress, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } + // start raftServer + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder) + } + ms.SetRaftServer(raftServer) + r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + // starting grpc server + grpcPort := *masterOption.port + 10000 + grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0) + if err != nil { + glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) + } + // Create your protocol servers. + grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) + master_pb.RegisterSeaweedServer(grpcS, ms) + protobuf.RegisterRaftServer(grpcS, raftServer) + reflection.Register(grpcS) + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort) + go grpcS.Serve(grpcL) - go func() { - // start raftServer - myMasterAddress, peers := checkPeers(*m.ip, *m.port, *m.peers) - raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), - peers, myMasterAddress, *m.metaFolder, ms.Topo, *m.pulseSeconds) - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *m.metaFolder) - } - ms.SetRaftServer(raftServer) - r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") - - // starting grpc server - grpcPort := *m.port + 10000 - grpcL, err := util.NewListener(*m.ipBind+":"+strconv.Itoa(grpcPort), 0) - if err != nil { - glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) - } - // Create your protocol servers. - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) - master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) - reflection.Register(grpcS) - - glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *m.ipBind, grpcPort) - grpcS.Serve(grpcL) - }() + go ms.MasterClient.KeepConnectedToMaster() // start http server httpS := &http.Server{Handler: r} - if err := httpS.Serve(masterListener); err != nil { - glog.Fatalf("master server failed to serve: %v", err) - } + go httpS.Serve(masterListener) - return true + select {} } func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { @@ -156,11 +159,10 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st } } - peerCount := len(cleanedPeers) if !hasSelf { - peerCount += 1 + cleanedPeers = append(cleanedPeers, masterAddress) } - if peerCount%2 == 0 { + if len(cleanedPeers)%2 == 0 { glog.Fatalf("Only odd number of masters are supported!") } return diff --git a/weed/command/mount.go b/weed/command/mount.go index ec790c999..f09b285f7 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -10,13 +10,14 @@ type MountOptions struct { filer *string filerMountRootPath *string dir *string - dirListingLimit *int + dirListCacheLimit *int64 collection *string replication *string ttlSec *int chunkSizeLimitMB *int dataCenter *string allowOthers *bool + umaskString *string } var ( @@ -30,13 +31,14 @@ func init() { mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server") mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") - mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size") + mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") + mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") } @@ -62,12 +64,12 @@ var cmdMount = &Command{ func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { hostnameAndPort := strings.Split(filer, ":") if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort) + return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) } filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) if parseErr != nil { - return "", fmt.Errorf("The filer filer port parse error: %v", parseErr) + return "", fmt.Errorf("filer port parse error: %v", parseErr) } filerGrpcPort := int(filerPort) + 10000 diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go new file mode 100644 index 000000000..f0a5581e7 --- /dev/null +++ b/weed/command/mount_darwin.go @@ -0,0 +1,13 @@ +package command + +import ( + "github.com/seaweedfs/fuse" +) + +func osSpecificMountOptions() []fuse.MountOption { + return []fuse.MountOption{} +} + +func checkMountPointAvailable(dir string) bool { + return true +} diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go new file mode 100644 index 000000000..f0a5581e7 --- /dev/null +++ b/weed/command/mount_freebsd.go @@ -0,0 +1,13 @@ +package command + +import ( + "github.com/seaweedfs/fuse" +) + +func osSpecificMountOptions() []fuse.MountOption { + return []fuse.MountOption{} +} + +func checkMountPointAvailable(dir string) bool { + return true +} diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go new file mode 100644 index 000000000..80a5f9da4 --- /dev/null +++ b/weed/command/mount_linux.go @@ -0,0 +1,157 @@ +package command + +import ( + "bufio" + "fmt" + "io" + "os" + "strings" + + "github.com/seaweedfs/fuse" +) + +const ( + /* 36 35 98:0 /mnt1 /mnt2 rw,noatime master:1 - ext3 /dev/root rw,errors=continue + (1)(2)(3) (4) (5) (6) (7) (8) (9) (10) (11) + + (1) mount ID: unique identifier of the mount (may be reused after umount) + (2) parent ID: ID of parent (or of self for the top of the mount tree) + (3) major:minor: value of st_dev for files on filesystem + (4) root: root of the mount within the filesystem + (5) mount point: mount point relative to the process's root + (6) mount options: per mount options + (7) optional fields: zero or more fields of the form "tag[:value]" + (8) separator: marks the end of the optional fields + (9) filesystem type: name of filesystem of the form "type[.subtype]" + (10) mount source: filesystem specific information or "none" + (11) super options: per super block options*/ + mountinfoFormat = "%d %d %d:%d %s %s %s %s" +) + +// Info reveals information about a particular mounted filesystem. This +// struct is populated from the content in the /proc/<pid>/mountinfo file. +type Info struct { + // ID is a unique identifier of the mount (may be reused after umount). + ID int + + // Parent indicates the ID of the mount parent (or of self for the top of the + // mount tree). + Parent int + + // Major indicates one half of the device ID which identifies the device class. + Major int + + // Minor indicates one half of the device ID which identifies a specific + // instance of device. + Minor int + + // Root of the mount within the filesystem. + Root string + + // Mountpoint indicates the mount point relative to the process's root. + Mountpoint string + + // Opts represents mount-specific options. + Opts string + + // Optional represents optional fields. + Optional string + + // Fstype indicates the type of filesystem, such as EXT3. + Fstype string + + // Source indicates filesystem specific information or "none". + Source string + + // VfsOpts represents per super block options. + VfsOpts string +} + +// Mounted determines if a specified mountpoint has been mounted. +// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab. +func mounted(mountPoint string) (bool, error) { + entries, err := parseMountTable() + if err != nil { + return false, err + } + + // Search the table for the mountPoint + for _, e := range entries { + if e.Mountpoint == mountPoint { + return true, nil + } + } + return false, nil +} + +// Parse /proc/self/mountinfo because comparing Dev and ino does not work from +// bind mounts +func parseMountTable() ([]*Info, error) { + f, err := os.Open("/proc/self/mountinfo") + if err != nil { + return nil, err + } + defer f.Close() + + return parseInfoFile(f) +} + +func parseInfoFile(r io.Reader) ([]*Info, error) { + var ( + s = bufio.NewScanner(r) + out []*Info + ) + + for s.Scan() { + if err := s.Err(); err != nil { + return nil, err + } + + var ( + p = &Info{} + text = s.Text() + optionalFields string + ) + + if _, err := fmt.Sscanf(text, mountinfoFormat, + &p.ID, &p.Parent, &p.Major, &p.Minor, + &p.Root, &p.Mountpoint, &p.Opts, &optionalFields); err != nil { + return nil, fmt.Errorf("Scanning '%s' failed: %s", text, err) + } + // Safe as mountinfo encodes mountpoints with spaces as \040. + index := strings.Index(text, " - ") + postSeparatorFields := strings.Fields(text[index+3:]) + if len(postSeparatorFields) < 3 { + return nil, fmt.Errorf("Error found less than 3 fields post '-' in %q", text) + } + + if optionalFields != "-" { + p.Optional = optionalFields + } + + p.Fstype = postSeparatorFields[0] + p.Source = postSeparatorFields[1] + p.VfsOpts = strings.Join(postSeparatorFields[2:], " ") + out = append(out, p) + } + return out, nil +} + +func osSpecificMountOptions() []fuse.MountOption { + return []fuse.MountOption{ + fuse.AllowNonEmptyMount(), + } +} + +func checkMountPointAvailable(dir string) bool { + mountPoint := dir + if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") { + mountPoint = mountPoint[0 : len(mountPoint)-1] + } + + if mounted, err := mounted(mountPoint); err != nil || mounted { + return false + } + + return true +} diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go index 3bf22ddc4..f3c0de3d6 100644 --- a/weed/command/mount_notsupported.go +++ b/weed/command/mount_notsupported.go @@ -1,5 +1,6 @@ // +build !linux // +build !darwin +// +build !freebsd package command diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 1d1214266..453531d00 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -1,4 +1,4 @@ -// +build linux darwin +// +build linux darwin freebsd package command @@ -12,12 +12,12 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/jacobsa/daemonize" "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" @@ -27,6 +27,12 @@ func runMount(cmd *Command, args []string) bool { util.SetupProfiling(*mountCpuProfile, *mountMemProfile) + umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64) + if umaskErr != nil { + fmt.Printf("can not parse umask %s", *mountOptions.umaskString) + return false + } + return RunMount( *mountOptions.filer, *mountOptions.filerMountRootPath, @@ -37,12 +43,13 @@ func runMount(cmd *Command, args []string) bool { *mountOptions.chunkSizeLimitMB, *mountOptions.allowOthers, *mountOptions.ttlSec, - *mountOptions.dirListingLimit, + *mountOptions.dirListCacheLimit, + os.FileMode(umask), ) } func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, - allowOthers bool, ttlSec int, dirListingLimit int) bool { + allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool { util.LoadConfiguration("security", false) @@ -81,12 +88,18 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente } } + // Ensure target mount point availability + if isValid := checkMountPointAvailable(dir); !isValid { + glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir) + return false + } + mountName := path.Base(dir) options := []fuse.MountOption{ fuse.VolumeName(mountName), - fuse.FSName("SeaweedFS"), - fuse.Subtype("SeaweedFS"), + fuse.FSName(filer + ":" + filerMountRootPath), + fuse.Subtype("seaweedfs"), fuse.NoAppleDouble(), fuse.NoAppleXattr(), fuse.NoBrowse(), @@ -100,15 +113,18 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente fuse.WritebackCache(), fuse.AllowNonEmptyMount(), } + + options = append(options, osSpecificMountOptions()...) + if allowOthers { options = append(options, fuse.AllowOther()) } c, err := fuse.Mount(dir, options...) if err != nil { - glog.Fatal(err) + glog.V(0).Infof("mount: %v", err) daemonize.SignalOutcome(err) - return false + return true } util.OnInterrupt(func() { @@ -118,9 +134,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente filerGrpcAddress, err := parseFilerGrpcAddress(filer) if err != nil { - glog.Fatal(err) + glog.V(0).Infof("parseFilerGrpcAddress: %v", err) daemonize.SignalOutcome(err) - return false + return true } mountRoot := filerMountRootPath @@ -139,13 +155,14 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente TtlSec: int32(ttlSec), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, DataCenter: dataCenter, - DirListingLimit: dirListingLimit, + DirListCacheLimit: dirListCacheLimit, EntryCacheTtl: 3 * time.Second, MountUid: uid, MountGid: gid, MountMode: mountMode, MountCtime: fileInfo.ModTime(), MountMtime: time.Now(), + Umask: umask, })) if err != nil { fuse.Unmount(dir) @@ -154,8 +171,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente // check if the mount process has an error to report <-c.Ready if err := c.MountError; err != nil { - glog.Fatal(err) + glog.V(0).Infof("mount process: %v", err) daemonize.SignalOutcome(err) + return true } return true diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 062fe0ff8..a76466ed6 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -59,15 +59,6 @@ const ( # $HOME/.seaweedfs/filer.toml # /etc/seaweedfs/filer.toml -[memory] -# local in memory, mostly for testing purpose -enabled = false - -[leveldb] -# local on disk, mostly for simple single-machine setup, fairly scalable -enabled = false -dir = "." # directory to store level db files - [leveldb2] # local on disk, mostly for simple single-machine setup, fairly scalable # faster than previous leveldb, recommended. @@ -78,7 +69,7 @@ dir = "." # directory to store level db files # multiple filers on shared storage, fairly scalable #################################################### -[mysql] +[mysql] # or tidb # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', # name VARCHAR(1000) COMMENT 'directory or file name', @@ -95,8 +86,9 @@ password = "" database = "" # create or use an existing database connection_max_idle = 2 connection_max_open = 100 +interpolateParams = false -[postgres] +[postgres] # or cockroachdb # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT, # name VARCHAR(65535), @@ -131,7 +123,7 @@ hosts=[ enabled = false address = "localhost:6379" password = "" -db = 0 +database = 0 [redis_cluster] enabled = false @@ -144,6 +136,20 @@ addresses = [ "localhost:30006", ] password = "" +// allows reads from slave servers or the master, but all writes still go to the master +readOnly = true +// automatically use the closest Redis server for reads +routeByLatency = true + +[etcd] +enabled = false +servers = "localhost:2379" +timeout = "3s" + +[tikv] +enabled = false +pdAddress = "192.168.199.113:2379" + ` @@ -217,22 +223,22 @@ grpcAddress = "localhost:18888" # all files under this directory tree are replicated. # this is not a directory on your hard drive, but on your filer. # i.e., all files with this "prefix" are sent to notification message queue. -directory = "/buckets" +directory = "/buckets" [sink.filer] enabled = false grpcAddress = "localhost:18888" # all replicated files are under this directory tree -# this is not a directory on your hard drive, but on your filer. +# this is not a directory on your hard drive, but on your filer. # i.e., all received files will be "prefixed" to this directory. -directory = "/backup" +directory = "/backup" replication = "" collection = "" ttlSec = 0 [sink.s3] # read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html -# default loads credentials from the shared credentials file (~/.aws/credentials). +# default loads credentials from the shared credentials file (~/.aws/credentials). enabled = false aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). @@ -336,5 +342,23 @@ scripts = """ """ sleep_minutes = 17 # sleep minutes between each script execution +[master.filer] +default_filer_url = "http://localhost:8888/" + +[master.sequencer] +type = "memory" # Choose [memory|etcd] type for storing the file id sequence +# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence +# example : http://127.0.0.1:2379,http://127.0.0.1:2389 +sequencer_etcd_urls = "http://127.0.0.1:2379" + + +[storage.backend] + [storage.backend.s3.default] + enabled = false + aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). + aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). + region = "us-east-2" + bucket = "your_bucket_name" # an existing bucket + ` ) diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go new file mode 100644 index 000000000..423dacc32 --- /dev/null +++ b/weed/command/scaffold_test.go @@ -0,0 +1,44 @@ +package command + +import ( + "bytes" + "fmt" + "testing" + + "github.com/spf13/viper" +) + +func TestReadingTomlConfiguration(t *testing.T) { + + viper.SetConfigType("toml") + + // any approach to require this configuration into your program. + var tomlExample = []byte(` +[database] +server = "192.168.1.1" +ports = [ 8001, 8001, 8002 ] +connection_max = 5000 +enabled = true + +[servers] + + # You can indent as you please. Tabs or spaces. TOML don't care. + [servers.alpha] + ip = "10.0.0.1" + dc = "eqdc10" + + [servers.beta] + ip = "10.0.0.2" + dc = "eqdc10" + +`) + + viper.ReadConfig(bytes.NewBuffer(tomlExample)) + + fmt.Printf("database is %v\n", viper.Get("database")) + fmt.Printf("servers is %v\n", viper.GetStringMap("servers")) + + alpha := viper.Sub("servers.alpha") + + fmt.Printf("alpha ip is %v\n", alpha.GetString("ip")) +} diff --git a/weed/command/server.go b/weed/command/server.go index f8c1d06fc..87f404ed3 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,25 +2,14 @@ package command import ( "fmt" - "net/http" "os" "runtime" "runtime/pprof" - "strconv" "strings" - "sync" "time" - "github.com/chrislusf/raft/protobuf" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" - "google.golang.org/grpc/reflection" ) type ServerOptions struct { @@ -132,14 +121,17 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - master := *serverIp + ":" + strconv.Itoa(*masterOptions.port) + _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) + peers := strings.Join(peerList, ",") + masterOptions.peers = &peers + masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = &master + filerOptions.masters = &peers filerOptions.ip = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp - serverOptions.v.masters = &master + serverOptions.v.masters = &peers serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack @@ -198,59 +190,12 @@ func runServer(cmd *Command, args []string) bool { }() } - var volumeWait sync.WaitGroup - - volumeWait.Add(1) - - go func() { - r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, masterOptions.toMasterOption(serverWhiteList)) - - glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterOptions.port) - masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterOptions.port), 0) - if e != nil { - glog.Fatalf("Master startup error: %v", e) - } - - go func() { - // start raftServer - myMasterAddress, peers := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) - raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), - peers, myMasterAddress, *masterOptions.metaFolder, ms.Topo, *masterOptions.pulseSeconds) - ms.SetRaftServer(raftServer) - r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") - - // starting grpc server - grpcPort := *masterOptions.port + 10000 - grpcL, err := util.NewListener(*serverBindIp+":"+strconv.Itoa(grpcPort), 0) - if err != nil { - glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) - } - // Create your protocol servers. - glog.V(1).Infof("grpc config %+v", viper.Sub("grpc")) - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) - master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) - reflection.Register(grpcS) - - glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort) - grpcS.Serve(grpcL) - }() - - volumeWait.Done() - - // start http server - httpS := &http.Server{Handler: r} - if err := httpS.Serve(masterListener); err != nil { - glog.Fatalf("master server failed to serve: %v", err) - } - - }() - - volumeWait.Wait() - time.Sleep(100 * time.Millisecond) + // start volume server + { + go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) + } - serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) + startMaster(masterOptions, serverWhiteList) return true } diff --git a/weed/command/shell.go b/weed/command/shell.go index 79f8b8bf9..34b5aef31 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -1,6 +1,8 @@ package command import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/shell" "github.com/chrislusf/seaweedfs/weed/util" @@ -8,12 +10,14 @@ import ( ) var ( - shellOptions shell.ShellOptions + shellOptions shell.ShellOptions + shellInitialFilerUrl *string ) func init() { cmdShell.Run = runShell // break init cycle shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers") + shellInitialFilerUrl = cmdShell.Flag.String("filer.url", "http://localhost:8888/", "initial filer url") } var cmdShell = &Command{ @@ -24,16 +28,17 @@ var cmdShell = &Command{ `, } -var () - func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") - shellOptions.FilerHost = "localhost" - shellOptions.FilerPort = 8888 - shellOptions.Directory = "/" + var filerPwdErr error + shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl) + if filerPwdErr != nil { + fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr) + return false + } shell.RunShell(shellOptions) diff --git a/weed/command/volume.go b/weed/command/volume.go index 3c1aa2b50..3e8341ef8 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -1,6 +1,7 @@ package command import ( + "fmt" "net/http" "os" "runtime" @@ -10,7 +11,9 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util/httpdown" "github.com/spf13/viper" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -94,7 +97,7 @@ func runVolume(cmd *Command, args []string) bool { func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) { - //Set multiple folders and each folder's max volume count limit' + // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") maxCountStrings := strings.Split(maxVolumeCounts, ",") for _, maxString := range maxCountStrings { @@ -113,7 +116,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } } - //security related white list configuration + // security related white list configuration if volumeWhiteListOption != "" { v.whiteList = strings.Split(volumeWhiteListOption, ",") } @@ -128,11 +131,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if *v.publicUrl == "" { *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) } - isSeperatedPublicPort := *v.publicPort != *v.port volumeMux := http.NewServeMux() publicVolumeMux := volumeMux - if isSeperatedPublicPort { + if v.isSeparatedPublicPort() { publicVolumeMux = http.NewServeMux() } @@ -158,51 +160,131 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, ) - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) - glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) - listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) - } - if isSeperatedPublicPort { - publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) - glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) - publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) + // starting grpc server + grpcS := v.startGrpcService(volumeServer) + + // starting public http server + var publicHttpDown httpdown.Server + if v.isSeparatedPublicPort() { + publicHttpDown = v.startPublicHttpService(publicVolumeMux) + if nil == publicHttpDown { + glog.Fatalf("start public http service failed") } - go func() { - if e := http.Serve(publicListener, publicVolumeMux); e != nil { - glog.Fatalf("Volume server fail to serve public: %v", e) - } - }() } + // starting the cluster http server + clusterHttpServer := v.startClusterHttpService(volumeMux) + + stopChain := make(chan struct{}) util.OnInterrupt(func() { + fmt.Println("volume server has be killed") + var startTime time.Time + + // firstly, stop the public http service to prevent from receiving new user request + if nil != publicHttpDown { + startTime = time.Now() + if err := publicHttpDown.Stop(); err != nil { + glog.Warningf("stop the public http server failed, %v", err) + } + delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("stop public http server, elapsed %dms", delta) + } + + startTime = time.Now() + if err := clusterHttpServer.Stop(); err != nil { + glog.Warningf("stop the cluster http server failed, %v", err) + } + delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta) + + startTime = time.Now() + grpcS.GracefulStop() + delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta) + + startTime = time.Now() volumeServer.Shutdown() + delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 + glog.V(0).Infof("stop volume server, elapsed [%d]", delta) + pprof.StopCPUProfile() + + close(stopChain) // notify exit }) - // starting grpc server + select { + case <-stopChain: + } + glog.Warningf("the volume server exit.") +} + +// check whether configure the public port +func (v VolumeServerOptions) isSeparatedPublicPort() bool { + return *v.publicPort != *v.port +} + +func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server { grpcPort := *v.port + 10000 grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume")) - volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer) + volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) - go grpcS.Serve(grpcL) - - if viper.GetString("https.volume.key") != "" { - if e := http.ServeTLS(listener, volumeMux, - viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) + go func() { + if err := grpcS.Serve(grpcL); err != nil { + glog.Fatalf("start gRPC service failed, %s", err) } - } else { - if e := http.Serve(listener, volumeMux); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) + }() + return grpcS +} + +func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { + publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) + publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + + pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute} + publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener) + go func() { + if err := publicHttpDown.Wait(); err != nil { + glog.Errorf("public http down wait failed, %v", err) } + }() + + return publicHttpDown +} + +func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server { + var ( + certFile, keyFile string + ) + if viper.GetString("https.volume.key") != "" { + certFile = viper.GetString("https.volume.cert") + keyFile = viper.GetString("https.volume.key") + } + + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) + glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) + listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) } + httpDown := httpdown.HTTP{ + KillTimeout: 5 * time.Minute, + StopTimeout: 5 * time.Minute, + CertFile: certFile, + KeyFile: keyFile} + clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener) + go func() { + if e := clusterHttpServer.Wait(); e != nil { + glog.Fatalf("Volume server fail to serve: %v", e) + } + }() + return clusterHttpServer } diff --git a/weed/command/weedfuse/README.md b/weed/command/weedfuse/README.md deleted file mode 100644 index 1a1496bbb..000000000 --- a/weed/command/weedfuse/README.md +++ /dev/null @@ -1,84 +0,0 @@ -Mount the SeaweedFS via FUSE - -# Mount by fstab - - -``` -$ # on linux -$ sudo apt-get install fuse -$ sudo echo 'user_allow_other' >> /etc/fuse.conf -$ sudo mv weedfuse /sbin/mount.weedfuse - -$ # on Mac -$ sudo mv weedfuse /sbin/mount_weedfuse - -``` - -On both OS X and Linux, you can add one of the entries to your /etc/fstab file like the following: - -``` -# mount the whole SeaweedFS -localhost:8888/ /home/some/mount/folder weedfuse - -# mount the SeaweedFS sub folder -localhost:8888/sub/dir /home/some/mount/folder weedfuse - -# mount the SeaweedFS sub folder with some options -localhost:8888/sub/dir /home/some/mount/folder weedfuse user - -``` - -To verify it can work, try this command -``` -$ sudo mount -av - -... - -/home/some/mount/folder : successfully mounted - -``` - -If you see `successfully mounted`, try to access the mounted folder and verify everything works. - - -To debug, run these: -``` - -$ weedfuse -foreground localhost:8888/ /home/some/mount/folder - -``` - - -To unmount the folder: -``` - -$ sudo umount /home/some/mount/folder - -``` - -<!-- not working yet! - -# Mount by autofs - -AutoFS can mount a folder if accessed. - -``` -# install autofs -$ sudo apt-get install autofs -``` - -Here is an example on how to mount a folder for all users under `/home` directory. -Assuming there exists corresponding folders under `/home` on both local and SeaweedFS. - -Edit `/etc/auto.master` and `/etc/auto.weedfuse` file with these content -``` -$ cat /etc/auto.master -/home /etc/auto.weedfuse - -$ cat /etc/auto.weedfuse -# map /home/<user> to localhost:8888/home/<user> -* -fstype=weedfuse,rw,allow_other,foreground :localhost\:8888/home/& - -``` - ---> diff --git a/weed/command/weedfuse/weedfuse.go b/weed/command/weedfuse/weedfuse.go deleted file mode 100644 index 4c0d12874..000000000 --- a/weed/command/weedfuse/weedfuse.go +++ /dev/null @@ -1,109 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - "strings" - - "github.com/chrislusf/seaweedfs/weed/command" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/jacobsa/daemonize" - "github.com/kardianos/osext" -) - -var ( - fuseCommand = flag.NewFlagSet("weedfuse", flag.ContinueOnError) - options = fuseCommand.String("o", "", "comma separated options rw,uid=xxx,gid=xxx") - isForeground = fuseCommand.Bool("foreground", false, "starts as a daemon") -) - -func main() { - - err := fuseCommand.Parse(os.Args[1:]) - if err != nil { - glog.Fatal(err) - } - fmt.Printf("options: %v\n", *options) - - // seems this value is always empty, need to parse it differently - optionsString := *options - prev := "" - for i, arg := range os.Args { - fmt.Printf("args[%d]: %v\n", i, arg) - if prev == "-o" { - optionsString = arg - } - prev = arg - } - - device := fuseCommand.Arg(0) - mountPoint := fuseCommand.Arg(1) - - fmt.Printf("source: %v\n", device) - fmt.Printf("target: %v\n", mountPoint) - - nouser := true - for _, option := range strings.Split(optionsString, ",") { - fmt.Printf("option: %v\n", option) - switch option { - case "user": - nouser = false - } - } - - maybeSetupPath() - - if !*isForeground { - startAsDaemon() - return - } - - parts := strings.SplitN(device, "/", 2) - filer, filerPath := parts[0], parts[1] - - command.RunMount( - filer, "/"+filerPath, mountPoint, "", "000", "", - 4, !nouser, 0, 1000000) - -} - -func maybeSetupPath() { - // sudo mount -av may not include PATH in some linux, e.g., Ubuntu - hasPathEnv := false - for _, e := range os.Environ() { - if strings.HasPrefix(e, "PATH=") { - hasPathEnv = true - } - fmt.Println(e) - } - if !hasPathEnv { - os.Setenv("PATH", "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin") - } -} - -func startAsDaemon() { - - // adapted from gcsfuse - - // Find the executable. - var path string - path, err := osext.Executable() - if err != nil { - glog.Fatalf("osext.Executable: %v", err) - } - - // Set up arguments. Be sure to use foreground mode. - args := append([]string{"-foreground"}, os.Args[1:]...) - - // Pass along PATH so that the daemon can find fusermount on Linux. - env := []string{ - fmt.Sprintf("PATH=%s", os.Getenv("PATH")), - } - - err = daemonize.Run(path, args, env, os.Stdout) - if err != nil { - glog.Fatalf("daemonize.Run: %v", err) - } - -} |
