diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/backup.go | 54 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 47 | ||||
| -rw-r--r-- | weed/command/command.go | 2 | ||||
| -rw-r--r-- | weed/command/compact.go | 5 | ||||
| -rw-r--r-- | weed/command/export.go | 24 | ||||
| -rw-r--r-- | weed/command/filer.go | 33 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 315 | ||||
| -rw-r--r-- | weed/command/filer_export.go | 187 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 12 | ||||
| -rw-r--r-- | weed/command/fix.go | 13 | ||||
| -rw-r--r-- | weed/command/master.go | 133 | ||||
| -rw-r--r-- | weed/command/mount.go | 9 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 103 | ||||
| -rw-r--r-- | weed/command/s3.go | 47 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 113 | ||||
| -rw-r--r-- | weed/command/server.go | 168 | ||||
| -rw-r--r-- | weed/command/shell.go | 63 | ||||
| -rw-r--r-- | weed/command/upload.go | 24 | ||||
| -rw-r--r-- | weed/command/volume.go | 39 | ||||
| -rw-r--r-- | weed/command/webdav.go | 109 | ||||
| -rw-r--r-- | weed/command/weedfuse/README.md | 84 | ||||
| -rw-r--r-- | weed/command/weedfuse/weedfuse.go | 109 |
22 files changed, 1090 insertions, 603 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index 072aea75b..31e146965 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -3,6 +3,11 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -30,26 +35,30 @@ var cmdBackup = &Command{ UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333", Short: "incrementally backup a volume to local folder", Long: `Incrementally backup volume data. - + It is expected that you use this inside a script, to loop through all possible volume ids that needs to be backup to local folder. - + The volume id does not need to exist locally or even remotely. This will help to backup future new volumes. - + Usually backing up is just copying the .dat (and .idx) files. But it's tricky to incrementally copy the differences. - + The complexity comes when there are multiple addition, deletion and compaction. - This tool will handle them correctly and efficiently, avoiding unnecessary data transporation. + This tool will handle them correctly and efficiently, avoiding unnecessary data transportation. `, } func runBackup(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + if *s.volumeId == -1 { return false } - vid := storage.VolumeId(*s.volumeId) + vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info lookup, err := operation.Lookup(*s.master, vid.String()) @@ -59,12 +68,12 @@ func runBackup(cmd *Command, args []string) bool { } volumeServer := lookup.Locations[0].Url - stats, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid)) + stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) if err != nil { fmt.Printf("Error get volume %d status: %v\n", vid, err) return true } - ttl, err := storage.ReadTTL(stats.Ttl) + ttl, err := needle.ReadTTL(stats.Ttl) if err != nil { fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) return true @@ -81,7 +90,34 @@ func runBackup(cmd *Command, args []string) bool { return true } - if err := v.Synchronize(volumeServer); err != nil { + if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { + if err = v.Compact(0, 0); err != nil { + fmt.Printf("Compact Volume before synchronizing %v\n", err) + return true + } + if err = v.CommitCompact(); err != nil { + fmt.Printf("Commit Compact before synchronizing %v\n", err) + return true + } + v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) + v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0) + } + + datSize, _, _ := v.FileStat() + + if datSize > stats.TailOffset { + // remove the old data + v.Destroy() + // recreate an empty volume + v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) + if err != nil { + fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) + return true + } + } + defer v.Close() + + if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil { fmt.Printf("Error synchronizing volume %d: %v\n", vid, err) return true } diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 60fd88ccd..dd0fdb88e 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -15,6 +15,9 @@ import ( "sync" "time" + "github.com/spf13/viper" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" @@ -33,15 +36,17 @@ type BenchmarkOptions struct { read *bool sequentialRead *bool collection *string + replication *string cpuprofile *string maxCpu *int - secretKey *string + grpcDialOption grpc.DialOption + masterClient *wdclient.MasterClient } var ( - b BenchmarkOptions - sharedBytes []byte - masterClient *wdclient.MasterClient + b BenchmarkOptions + sharedBytes []byte + isSecure bool ) func init() { @@ -57,9 +62,9 @@ func init() { b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") + b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") sharedBytes = make([]byte, 1024) } @@ -102,6 +107,10 @@ var ( ) func runBenchmark(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) if *b.maxCpu < 1 { *b.maxCpu = runtime.NumCPU() @@ -116,9 +125,9 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ",")) - go masterClient.KeepConnectedToMaster() - masterClient.WaitUntilConnected() + b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ",")) + go b.masterClient.KeepConnectedToMaster() + b.masterClient.WaitUntilConnected() if *b.write { benchWrite() @@ -188,7 +197,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { defer wait.Done() delayedDeleteChan := make(chan *delayedFile, 100) var waitForDeletions sync.WaitGroup - secret := security.Secret(*b.secretKey) for i := 0; i < 7; i++ { waitForDeletions.Add(1) @@ -198,8 +206,11 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if df.enterTime.After(time.Now()) { time.Sleep(df.enterTime.Sub(time.Now())) } - if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid, - security.GenJwt(secret, df.fp.Fid)); e == nil { + var jwtAuthorization security.EncodedJwt + if isSecure { + jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid) + } + if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { s.completed++ } else { s.failed++ @@ -219,12 +230,16 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { MimeType: "image/bench", // prevent gzip benchmark content } ar := &operation.VolumeAssignRequest{ - Count: 1, - Collection: *b.collection, + Count: 1, + Collection: *b.collection, + Replication: *b.replication, } - if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil { + if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection - if _, err := fp.Upload(0, masterClient.GetMaster(), secret); err == nil { + if !isSecure && assignResult.Auth != "" { + isSecure = true + } + if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -264,7 +279,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { fmt.Printf("reading file %s\n", fid) } start := time.Now() - url, err := masterClient.LookupFileId(fid) + url, err := b.masterClient.LookupFileId(fid) if err != nil { s.failed++ println("!!!! ", fid, " location not found!!!!!") diff --git a/weed/command/command.go b/weed/command/command.go index 91b9bf3fc..79c00d4cd 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -13,7 +13,6 @@ var Commands = []*Command{ cmdCompact, cmdCopy, cmdFix, - cmdFilerExport, cmdFilerReplicate, cmdServer, cmdMaster, @@ -27,6 +26,7 @@ var Commands = []*Command{ cmdVolume, cmdExport, cmdMount, + cmdWebDav, } type Command struct { diff --git a/weed/command/compact.go b/weed/command/compact.go index 0dd4efe0e..79d50c095 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -3,6 +3,7 @@ package command import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func init() { @@ -35,14 +36,14 @@ func runCompact(cmd *Command, args []string) bool { preallocate := *compactVolumePreallocate * (1 << 20) - vid := storage.VolumeId(*compactVolumeId) + vid := needle.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } if *compactMethod == 0 { - if err = v.Compact(preallocate); err != nil { + if err = v.Compact(preallocate, 0); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { diff --git a/weed/command/export.go b/weed/command/export.go index 5c7e064ce..7e94ec11c 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -12,10 +12,12 @@ import ( "text/template" "time" + "io" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" - "io" ) const ( @@ -66,10 +68,10 @@ var ( localLocation, _ = time.LoadLocation("Local") ) -func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Version, deleted bool) { - key := storage.NewFileIdFromNeedle(vid, n).String() +func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool) { + key := needle.NewFileIdFromNeedle(vid, n).String() size := n.DataSize - if version == storage.Version1 { + if version == needle.Version1 { size = n.Size } fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\n", @@ -85,10 +87,10 @@ func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Versio } type VolumeFileScanner4Export struct { - version storage.Version + version needle.Version counter int needleMap *storage.NeedleMap - vid storage.VolumeId + vid needle.VolumeId } func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error { @@ -100,14 +102,14 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4Export) VisitNeedle(n *storage.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error { needleMap := scanner.needleMap vid := scanner.vid nv, ok := needleMap.Get(n.Id) glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped(), ok, nv) - if ok && nv.Size > 0 && int64(nv.Offset)*types.NeedlePaddingSize == offset { + if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset { if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) { glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d", n.LastModified, newerThanUnix) @@ -189,7 +191,7 @@ func runExport(cmd *Command, args []string) bool { if *export.collection != "" { fileName = *export.collection + "_" + fileName } - vid := storage.VolumeId(*export.volumeId) + vid := needle.VolumeId(*export.volumeId) indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) if err != nil { glog.Fatalf("Create Volume Index [ERROR] %s\n", err) @@ -225,8 +227,8 @@ type nameParams struct { Ext string } -func writeFile(vid storage.VolumeId, n *storage.Needle) (err error) { - key := storage.NewFileIdFromNeedle(vid, n).String() +func writeFile(vid needle.VolumeId, n *needle.Needle) (err error) { + key := needle.NewFileIdFromNeedle(vid, n).String() fileNameTemplateBuffer.Reset() if err = fileNameTemplate.Execute(fileNameTemplateBuffer, nameParams{ diff --git a/weed/command/filer.go b/weed/command/filer.go index 0c1950f96..b1ceb46f5 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -6,6 +6,9 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/server" @@ -21,17 +24,16 @@ type FilerOptions struct { masters *string ip *string port *int - grpcPort *int publicPort *int collection *string defaultReplicaPlacement *string redirectOnRead *bool disableDirListing *bool maxMB *int - secretKey *string dirListingLimit *int dataCenter *string enableNotification *bool + disableHttp *bool // default leveldb directory, used in "weed server" mode defaultLevelDbDirectory *string @@ -43,15 +45,14 @@ func init() { f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") - f.grpcPort = cmdFiler.Flag.Int("port.grpc", 0, "filer grpc server listen port, default to http port + 10000") - f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public") + f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit") - f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size") f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center") + f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") } var cmdFiler = &Command{ @@ -70,13 +71,15 @@ var cmdFiler = &Command{ The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order. - The example filer.toml configuration file can be generated by "weed scaffold filer" + The example filer.toml configuration file can be generated by "weed scaffold -config=filer" `, } func runFiler(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) + f.startFiler() return true @@ -91,22 +94,23 @@ func (fo *FilerOptions) startFiler() { publicVolumeMux = http.NewServeMux() } - defaultLevelDbDirectory := "./filerdb" + defaultLevelDbDirectory := "./filerldb2" if fo.defaultLevelDbDirectory != nil { - defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerdb" + defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerldb2" } fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: strings.Split(*f.masters, ","), + Masters: strings.Split(*fo.masters, ","), Collection: *fo.collection, DefaultReplication: *fo.defaultReplicaPlacement, RedirectOnRead: *fo.redirectOnRead, DisableDirListing: *fo.disableDirListing, MaxMB: *fo.maxMB, - SecretKey: *fo.secretKey, DirListingLimit: *fo.dirListingLimit, DataCenter: *fo.dataCenter, DefaultLevelDbDir: defaultLevelDbDirectory, + DisableHttp: *fo.disableHttp, + Port: *fo.port, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) @@ -128,7 +132,7 @@ func (fo *FilerOptions) startFiler() { glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.VERSION, *fo.ip, *fo.port) filerListener, e := util.NewListener( - ":"+strconv.Itoa(*fo.port), + *fo.ip+":"+strconv.Itoa(*fo.port), time.Duration(10)*time.Second, ) if e != nil { @@ -136,15 +140,12 @@ func (fo *FilerOptions) startFiler() { } // starting grpc server - grpcPort := *fo.grpcPort - if grpcPort == 0 { - grpcPort = *fo.port + 10000 - } + grpcPort := *fo.port + 10000 grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer() + grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 3638bcb27..19aceb211 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -1,52 +1,56 @@ package command import ( + "context" "fmt" + "io" "io/ioutil" + "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" + "sync" + "time" - "context" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "io" - "net/http" - "strconv" - "time" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/spf13/viper" + "google.golang.org/grpc" ) var ( - copy CopyOptions + copy CopyOptions + waitGroup sync.WaitGroup ) type CopyOptions struct { - filerGrpcPort *int - master *string - include *string - replication *string - collection *string - ttl *string - maxMB *int - secretKey *string - - secret security.Secret + include *string + replication *string + collection *string + ttl *string + maxMB *int + masterClient *wdclient.MasterClient + concurrency *int + compressionLevel *int + grpcDialOption grpc.DialOption + masters []string } func init() { cmdCopy.Run = runCopy // break init cycle cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information") - copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location") copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") copy.replication = cmdCopy.Flag.String("replication", "", "replication type") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit") - copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000") - copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") + copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") } var cmdCopy = &Command{ @@ -66,7 +70,9 @@ var cmdCopy = &Command{ } func runCopy(cmd *Command, args []string) bool { - copy.secret = security.Secret(*copy.secretKey) + + util.LoadConfiguration("security", false) + if len(args) <= 1 { return false } @@ -96,67 +102,170 @@ func runCopy(cmd *Command, args []string) bool { } filerGrpcPort := filerPort + 10000 - if *copy.filerGrpcPort != 0 { - filerGrpcPort = uint64(*copy.filerGrpcPort) + filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + + ctx := context.Background() + + masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress) + if err != nil { + fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + return false } + if *copy.collection == "" { + *copy.collection = collection + } + if *copy.replication == "" { + *copy.replication = replication + } + if *copy.maxMB == 0 { + *copy.maxMB = int(maxMB) + } + copy.masters = masters - filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters) + go copy.masterClient.KeepConnectedToMaster() + copy.masterClient.WaitUntilConnected() - for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) { - return false + if *cmdCopy.IsDebug { + util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") + } + + fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency) + + go func() { + defer close(fileCopyTaskChan) + for _, fileOrDir := range fileOrDirs { + if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err) + break + } } + }() + for i := 0; i < *copy.concurrency; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + worker := FileCopyWorker{ + options: ©, + filerHost: filerUrl.Host, + filerGrpcAddress: filerGrpcAddress, + } + if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) + return + } + }() } + waitGroup.Wait() + return true } -func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool { - f, err := os.Open(fileOrDir) - if err != nil { - fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) - return false - } - defer f.Close() +func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) { + err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb + return nil + }) + return +} - fi, err := f.Stat() +func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error { + + fi, err := os.Stat(fileOrDir) if err != nil { - fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err) - return false + fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err) + return nil } mode := fi.Mode() if mode.IsDir() { files, _ := ioutil.ReadDir(fileOrDir) for _, subFileOrDir := range files { - if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") { - return false + if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil { + return err } } - return true + return nil } + uid, gid := util.GetFileUidGid(fi) + + fileCopyTaskChan <- FileCopyTask{ + sourceLocation: fileOrDir, + destinationUrlPath: destPath, + fileSize: fi.Size(), + fileMode: fi.Mode(), + uid: uid, + gid: gid, + } + + return nil +} + +type FileCopyWorker struct { + options *CopyOptions + filerHost string + filerGrpcAddress string +} + +func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error { + for task := range fileCopyTaskChan { + if err := worker.doEachCopy(ctx, task); err != nil { + return err + } + } + return nil +} + +type FileCopyTask struct { + sourceLocation string + destinationUrlPath string + fileSize int64 + fileMode os.FileMode + uid uint32 + gid uint32 +} + +func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error { + + f, err := os.Open(task.sourceLocation) + if err != nil { + fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err) + if _, ok := err.(*os.PathError); ok { + fmt.Printf("skipping %s\n", task.sourceLocation) + return nil + } + return err + } + defer f.Close() + // this is a regular file - if *copy.include != "" { - if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok { - return true + if *worker.options.include != "" { + if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok { + return nil } } // find the chunk count - chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkSize := int64(*worker.options.maxMB * 1024 * 1024) chunkCount := 1 - if chunkSize > 0 && fi.Size() > chunkSize { - chunkCount = int(fi.Size()/chunkSize) + 1 + if chunkSize > 0 && task.fileSize > chunkSize { + chunkCount = int(task.fileSize/chunkSize) + 1 } if chunkCount == 1 { - return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi) + return worker.uploadFileAsOne(ctx, task, f) } - return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize) + return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize) } -func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool { +func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error { // upload the file content fileName := filepath.Base(f.Name()) @@ -164,29 +273,27 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f var chunks []*filer_pb.FileChunk - if fi.Size() > 0 { + if task.fileSize > 0 { // assign a volume - assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, "") + uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) @@ -198,43 +305,42 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f ETag: uploadResult.ETag, }) - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - return true + return nil } -func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { +func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) @@ -244,14 +350,14 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, for i := int64(0); i < int64(chunkCount); i++ { // assign a volume - assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid @@ -259,14 +365,12 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(f, chunkSize), - false, "application/octet-stream", nil, "") + false, "application/octet-stream", nil, assignResult.Auth) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } chunks = append(chunks, &filer_pb.FileChunk{ FileId: assignResult.Fid, @@ -278,39 +382,38 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) } - if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) - return true + return nil } func detectMimeType(f *os.File) string { @@ -329,15 +432,11 @@ func detectMimeType(f *os.File) string { return mimeType } -func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error { - - grpcConnection, err := util.GrpcDial(filerAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", filerAddress, err) - } - defer grpcConnection.Close() +func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(clientConn) + return fn(client) + }, filerAddress, grpcDialOption) - return fn(client) } diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go deleted file mode 100644 index 7a2e7920a..000000000 --- a/weed/command/filer_export.go +++ /dev/null @@ -1,187 +0,0 @@ -package command - -import ( - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/notification" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/spf13/viper" -) - -func init() { - cmdFilerExport.Run = runFilerExport // break init cycle -} - -var cmdFilerExport = &Command{ - UsageLine: "filer.export -sourceStore=mysql -targetStore=cassandra", - Short: "export meta data in filer store", - Long: `Iterate the file tree and export all metadata out - - Both source and target store: - * should be a store name already specified in filer.toml - * do not need to be enabled state - - If target store is empty, only the directory tree will be listed. - - If target store is "notification", the list of entries will be sent to notification. - This is usually used to bootstrap filer replication to a remote system. - - `, -} - -var ( - // filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree") - filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml, default to currently enabled store") - filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml, or \"notification\" to export all files to message queue") - dir = cmdFilerExport.Flag.String("dir", "/", "only process files under this directory") - dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size") - dryRun = cmdFilerExport.Flag.Bool("dryRun", false, "not actually moving data") - verboseFilerExport = cmdFilerExport.Flag.Bool("v", false, "verbose entry details") -) - -type statistics struct { - directoryCount int - fileCount int -} - -func runFilerExport(cmd *Command, args []string) bool { - - weed_server.LoadConfiguration("filer", true) - config := viper.GetViper() - - var sourceStore, targetStore filer2.FilerStore - - for _, store := range filer2.Stores { - if store.GetName() == *filerExportSourceStore || *filerExportSourceStore == "" && config.GetBool(store.GetName()+".enabled") { - viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize source store for %s: %+v", - store.GetName(), err) - } else { - sourceStore = store - } - break - } - } - - for _, store := range filer2.Stores { - if store.GetName() == *filerExportTargetStore { - viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize target store for %s: %+v", - store.GetName(), err) - } else { - targetStore = store - } - break - } - } - - if sourceStore == nil { - glog.Errorf("Failed to find source store %s", *filerExportSourceStore) - println("existing data sources are:") - for _, store := range filer2.Stores { - println(" " + store.GetName()) - } - return false - } - - if targetStore == nil && *filerExportTargetStore != "" && *filerExportTargetStore != "notification" { - glog.Errorf("Failed to find target store %s", *filerExportTargetStore) - println("existing data sources are:") - for _, store := range filer2.Stores { - println(" " + store.GetName()) - } - return false - } - - stat := statistics{} - - var fn func(level int, entry *filer2.Entry) error - - if *filerExportTargetStore == "notification" { - weed_server.LoadConfiguration("notification", false) - v := viper.GetViper() - notification.LoadConfiguration(v.Sub("notification")) - - fn = func(level int, entry *filer2.Entry) error { - printout(level, entry) - if *dryRun { - return nil - } - return notification.Queue.SendMessage( - string(entry.FullPath), - &filer_pb.EventNotification{ - NewEntry: entry.ToProtoEntry(), - }, - ) - } - } else if targetStore == nil { - fn = printout - } else { - fn = func(level int, entry *filer2.Entry) error { - printout(level, entry) - if *dryRun { - return nil - } - return targetStore.InsertEntry(entry) - } - } - - doTraverse(&stat, sourceStore, filer2.FullPath(*dir), 0, fn) - - glog.Infof("processed %d directories, %d files", stat.directoryCount, stat.fileCount) - - return true -} - -func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) { - - limit := *dirListLimit - lastEntryName := "" - for { - entries, err := filerStore.ListDirectoryEntries(parentPath, lastEntryName, false, limit) - if err != nil { - break - } - for _, entry := range entries { - if fnErr := fn(level, entry); fnErr != nil { - glog.Errorf("failed to process entry: %s", entry.FullPath) - } - if entry.IsDirectory() { - stat.directoryCount++ - doTraverse(stat, filerStore, entry.FullPath, level+1, fn) - } else { - stat.fileCount++ - } - } - if len(entries) < limit { - break - } - } -} - -func printout(level int, entry *filer2.Entry) error { - for i := 0; i < level; i++ { - if i == level-1 { - print("+-") - } else { - print("| ") - } - } - print(entry.FullPath.Name()) - if *verboseFilerExport { - for _, chunk := range entry.Chunks { - print("[") - print(chunk.FileId) - print(",") - print(chunk.Offset) - print(",") - print(chunk.Size) - print(")") - } - } - println() - return nil -} diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 3384e4023..c6e7f5dba 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -1,6 +1,7 @@ package command import ( + "context" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -12,7 +13,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" - "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" ) @@ -28,15 +29,16 @@ var cmdFilerReplicate = &Command{ filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. - Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters. + Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters. `, } func runFilerReplicate(cmd *Command, args []string) bool { - weed_server.LoadConfiguration("replication", true) - weed_server.LoadConfiguration("notification", true) + util.LoadConfiguration("security", false) + util.LoadConfiguration("replication", true) + util.LoadConfiguration("notification", true) config := viper.GetViper() var notificationInput sub.NotificationInput @@ -115,7 +117,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } else { glog.V(1).Infof("modify: %s", key) } - if err = replicator.Replicate(key, m); err != nil { + if err = replicator.Replicate(context.Background(), key, m); err != nil { glog.Errorf("replicate %s: %+v", key, err) } else { glog.V(1).Infof("replicated %s", key) diff --git a/weed/command/fix.go b/weed/command/fix.go index a800978c6..bf33490cc 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -29,7 +30,7 @@ var ( ) type VolumeFileScanner4Fix struct { - version storage.Version + version needle.Version nm *storage.NeedleMap } @@ -42,14 +43,14 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { return false } -func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *storage.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) - if n.Size > 0 { - pe := scanner.nm.Put(n.Id, types.Offset(offset/types.NeedlePaddingSize), n.Size) + if n.Size > 0 && n.Size != types.TombstoneFileSize { + pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id, types.Offset(offset/types.NeedlePaddingSize)) + return scanner.nm.Delete(n.Id, types.ToOffset(offset)) } return nil } @@ -74,7 +75,7 @@ func runFix(cmd *Command, args []string) bool { nm := storage.NewBtreeNeedleMap(indexFile) defer nm.Close() - vid := storage.VolumeId(*fixVolumeId) + vid := needle.VolumeId(*fixVolumeId) scanner := &VolumeFileScanner4Fix{ nm: nm, } diff --git a/weed/command/master.go b/weed/command/master.go index bd2267b9e..9e9308468 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -6,76 +6,98 @@ import ( "runtime" "strconv" "strings" - "time" + "github.com/chrislusf/raft/protobuf" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/spf13/viper" "google.golang.org/grpc/reflection" ) +var ( + m MasterOptions +) + +type MasterOptions struct { + port *int + ip *string + ipBind *string + metaFolder *string + peers *string + volumeSizeLimitMB *uint + volumePreallocate *bool + pulseSeconds *int + defaultReplication *string + garbageThreshold *float64 + whiteList *string + disableHttp *bool + metricsAddress *string + metricsIntervalSec *int +} + func init() { cmdMaster.Run = runMaster // break init cycle + m.port = cmdMaster.Flag.Int("port", 9333, "http listen port") + m.ip = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address") + m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") + m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") + m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") + m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") + m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") + m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") + m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") + m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") } var cmdMaster = &Command{ UsageLine: "master -port=9333", Short: "start a master server", - Long: `start a master server to provide volume=>location mapping service - and sequence number of file ids + Long: `start a master server to provide volume=>location mapping service and sequence number of file ids + + The configuration file "security.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order. + + The example security.toml configuration file can be generated by "weed scaffold -config=security" `, } var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - mGrpcPort = cmdMaster.Flag.Int("port.grpc", 0, "grpc server listen port, default to http port + 10000") - masterIp = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address") - masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") - masterPeers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") - volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") - mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") - // mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") - masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") - masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") - masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file") + masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") + masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file") masterWhiteList []string ) func runMaster(cmd *Command, args []string) bool { - if *mMaxCpu < 1 { - *mMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*mMaxCpu) + + util.LoadConfiguration("security", false) + util.LoadConfiguration("master", false) + + runtime.GOMAXPROCS(runtime.NumCPU()) util.SetupProfiling(*masterCpuProfile, *masterMemProfile) - if err := util.TestFolderWritable(*metaFolder); err != nil { - glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) + if err := util.TestFolderWritable(*m.metaFolder); err != nil { + glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err) } - if *masterWhiteListOption != "" { - masterWhiteList = strings.Split(*masterWhiteListOption, ",") + if *m.whiteList != "" { + masterWhiteList = strings.Split(*m.whiteList, ",") } - if *volumeSizeLimitMB > 30*1000 { + if *m.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, *mport, *metaFolder, - *volumeSizeLimitMB, *volumePreallocate, - *mpulse, *defaultReplicaPlacement, *garbageThreshold, - masterWhiteList, *masterSecureKey, - ) + ms := weed_server.NewMasterServer(r, m.toMasterOption(masterWhiteList)) - listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) + listeningAddress := *m.ipBind + ":" + strconv.Itoa(*m.port) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) @@ -85,28 +107,29 @@ func runMaster(cmd *Command, args []string) bool { } go func() { - time.Sleep(100 * time.Millisecond) - myMasterAddress, peers := checkPeers(*masterIp, *mport, *masterPeers) - raftServer := weed_server.NewRaftServer(r, peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse) + // start raftServer + myMasterAddress, peers := checkPeers(*m.ip, *m.port, *m.peers) + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + peers, myMasterAddress, *m.metaFolder, ms.Topo, *m.pulseSeconds) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *m.metaFolder) + } ms.SetRaftServer(raftServer) - }() + r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") - go func() { // starting grpc server - grpcPort := *mGrpcPort - if grpcPort == 0 { - grpcPort = *mport + 10000 - } - grpcL, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *m.port + 10000 + grpcL, err := util.NewListener(*m.ipBind+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer() + grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) master_pb.RegisterSeaweedServer(grpcS, ms) + protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) - glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort) + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *m.ipBind, grpcPort) grpcS.Serve(grpcL) }() @@ -142,3 +165,19 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st } return } + +func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { + return &weed_server.MasterOption{ + Port: *m.port, + MetaFolder: *m.metaFolder, + VolumeSizeLimitMB: *m.volumeSizeLimitMB, + VolumePreallocate: *m.volumePreallocate, + PulseSeconds: *m.pulseSeconds, + DefaultReplicaPlacement: *m.defaultReplication, + GarbageThreshold: *m.garbageThreshold, + WhiteList: whiteList, + DisableHttp: *m.disableHttp, + MetricsAddress: *m.metricsAddress, + MetricsIntervalSec: *m.metricsIntervalSec, + } +} diff --git a/weed/command/mount.go b/weed/command/mount.go index e61f16783..ec790c999 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -8,7 +8,6 @@ import ( type MountOptions struct { filer *string - filerGrpcPort *int filerMountRootPath *string dir *string dirListingLimit *int @@ -17,6 +16,7 @@ type MountOptions struct { ttlSec *int chunkSizeLimitMB *int dataCenter *string + allowOthers *bool } var ( @@ -28,7 +28,6 @@ var ( func init() { cmdMount.Run = runMount // break init cycle mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") - mountOptions.filerGrpcPort = cmdMount.Flag.Int("filer.grpc.port", 0, "filer grpc server listen port, default to http port + 10000") mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server") mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size") @@ -37,6 +36,7 @@ func init() { mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") + mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") } @@ -59,7 +59,7 @@ var cmdMount = &Command{ `, } -func parseFilerGrpcAddress(filer string, optionalGrpcPort int) (filerGrpcAddress string, err error) { +func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { hostnameAndPort := strings.Split(filer, ":") if len(hostnameAndPort) != 2 { return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort) @@ -71,9 +71,6 @@ func parseFilerGrpcAddress(filer string, optionalGrpcPort int) (filerGrpcAddress } filerGrpcPort := int(filerPort) + 10000 - if optionalGrpcPort != 0 { - filerGrpcPort = optionalGrpcPort - } return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil } diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2937b9ef1..1d1214266 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -6,11 +6,16 @@ import ( "fmt" "os" "os/user" + "path" "runtime" "strconv" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/jacobsa/daemonize" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" @@ -19,40 +24,67 @@ import ( ) func runMount(cmd *Command, args []string) bool { + + util.SetupProfiling(*mountCpuProfile, *mountMemProfile) + + return RunMount( + *mountOptions.filer, + *mountOptions.filerMountRootPath, + *mountOptions.dir, + *mountOptions.collection, + *mountOptions.replication, + *mountOptions.dataCenter, + *mountOptions.chunkSizeLimitMB, + *mountOptions.allowOthers, + *mountOptions.ttlSec, + *mountOptions.dirListingLimit, + ) +} + +func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, + allowOthers bool, ttlSec int, dirListingLimit int) bool { + + util.LoadConfiguration("security", false) + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) - if *mountOptions.dir == "" { + if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") return false } - if *mountOptions.chunkSizeLimitMB <= 0 { + if chunkSizeLimitMB <= 0 { fmt.Printf("Please specify a reasonable buffer size.") return false } - fuse.Unmount(*mountOptions.dir) + fuse.Unmount(dir) + + uid, gid := uint32(0), uint32(0) // detect mount folder mode mountMode := os.ModeDir | 0755 - if fileInfo, err := os.Stat(*mountOptions.dir); err == nil { + fileInfo, err := os.Stat(dir) + if err == nil { mountMode = os.ModeDir | fileInfo.Mode() + uid, gid = util.GetFileUidGid(fileInfo) + fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode()) } - // detect current user - uid, gid := uint32(0), uint32(0) - if u, err := user.Current(); err == nil { - if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { - uid = uint32(parsedId) - } - if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil { - gid = uint32(parsedId) + if uid == 0 { + if u, err := user.Current(); err == nil { + if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { + uid = uint32(parsedId) + } + if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil { + gid = uint32(parsedId) + } + fmt.Printf("current uid=%d gid=%d\n", uid, gid) } } - util.SetupProfiling(*mountCpuProfile, *mountMemProfile) + mountName := path.Base(dir) - c, err := fuse.Mount( - *mountOptions.dir, - fuse.VolumeName("SeaweedFS"), + options := []fuse.MountOption{ + fuse.VolumeName(mountName), fuse.FSName("SeaweedFS"), fuse.Subtype("SeaweedFS"), fuse.NoAppleDouble(), @@ -61,56 +93,69 @@ func runMount(cmd *Command, args []string) bool { fuse.AutoXattr(), fuse.ExclCreate(), fuse.DaemonTimeout("3600"), - fuse.AllowOther(), fuse.AllowSUID(), fuse.DefaultPermissions(), - fuse.MaxReadahead(1024*128), + fuse.MaxReadahead(1024 * 128), fuse.AsyncRead(), fuse.WritebackCache(), - ) + fuse.AllowNonEmptyMount(), + } + if allowOthers { + options = append(options, fuse.AllowOther()) + } + + c, err := fuse.Mount(dir, options...) if err != nil { glog.Fatal(err) + daemonize.SignalOutcome(err) return false } util.OnInterrupt(func() { - fuse.Unmount(*mountOptions.dir) + fuse.Unmount(dir) c.Close() }) - filerGrpcAddress, err := parseFilerGrpcAddress(*mountOptions.filer, *mountOptions.filerGrpcPort) + filerGrpcAddress, err := parseFilerGrpcAddress(filer) if err != nil { glog.Fatal(err) + daemonize.SignalOutcome(err) return false } - mountRoot := *mountOptions.filerMountRootPath + mountRoot := filerMountRootPath if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") { mountRoot = mountRoot[0 : len(mountRoot)-1] } + daemonize.SignalOutcome(nil) + err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{ FilerGrpcAddress: filerGrpcAddress, + GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), FilerMountRootPath: mountRoot, - Collection: *mountOptions.collection, - Replication: *mountOptions.replication, - TtlSec: int32(*mountOptions.ttlSec), - ChunkSizeLimit: int64(*mountOptions.chunkSizeLimitMB) * 1024 * 1024, - DataCenter: *mountOptions.dataCenter, - DirListingLimit: *mountOptions.dirListingLimit, + Collection: collection, + Replication: replication, + TtlSec: int32(ttlSec), + ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + DataCenter: dataCenter, + DirListingLimit: dirListingLimit, EntryCacheTtl: 3 * time.Second, MountUid: uid, MountGid: gid, MountMode: mountMode, + MountCtime: fileInfo.ModTime(), + MountMtime: time.Now(), })) if err != nil { - fuse.Unmount(*mountOptions.dir) + fuse.Unmount(dir) } // check if the mount process has an error to report <-c.Ready if err := c.MountError; err != nil { glog.Fatal(err) + daemonize.SignalOutcome(err) } return true diff --git a/weed/command/s3.go b/weed/command/s3.go index 16a9490ff..e004bb066 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -4,7 +4,11 @@ import ( "net/http" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/s3api" "github.com/chrislusf/seaweedfs/weed/util" @@ -12,12 +16,11 @@ import ( ) var ( - s3options S3Options + s3StandaloneOptions S3Options ) type S3Options struct { filer *string - filerGrpcPort *int filerBucketsPath *string port *int domainName *string @@ -27,13 +30,12 @@ type S3Options struct { func init() { cmdS3.Run = runS3 // break init cycle - s3options.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") - s3options.filerGrpcPort = cmdS3.Flag.Int("filer.grpcPort", 0, "filer server grpc port, default to filer http port plus 10000") - s3options.filerBucketsPath = cmdS3.Flag.String("filer.dir.buckets", "/buckets", "folder on filer to store all buckets") - s3options.port = cmdS3.Flag.Int("port", 8333, "s3options server http listen port") - s3options.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}") - s3options.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") - s3options.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") + s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") + s3StandaloneOptions.filerBucketsPath = cmdS3.Flag.String("filer.dir.buckets", "/buckets", "folder on filer to store all buckets") + s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") + s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}") + s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") + s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") } var cmdS3 = &Command{ @@ -46,7 +48,15 @@ var cmdS3 = &Command{ func runS3(cmd *Command, args []string) bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*s3options.filer, *s3options.filerGrpcPort) + util.LoadConfiguration("security", false) + + return s3StandaloneOptions.startS3Server() + +} + +func (s3opt *S3Options) startS3Server() bool { + + filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer) if err != nil { glog.Fatal(err) return false @@ -55,10 +65,11 @@ func runS3(cmd *Command, args []string) bool { router := mux.NewRouter().SkipClean(true) _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: *s3options.filer, + Filer: *s3opt.filer, FilerGrpcAddress: filerGrpcAddress, - DomainName: *s3options.domainName, - BucketsPath: *s3options.filerBucketsPath, + DomainName: *s3opt.domainName, + BucketsPath: *s3opt.filerBucketsPath, + GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) @@ -66,22 +77,22 @@ func runS3(cmd *Command, args []string) bool { httpS := &http.Server{Handler: router} - listenAddress := fmt.Sprintf(":%d", *s3options.port) + listenAddress := fmt.Sprintf(":%d", *s3opt.port) s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) if err != nil { glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err) } - if *s3options.tlsPrivateKey != "" { - if err = httpS.ServeTLS(s3ApiListener, *s3options.tlsCertificate, *s3options.tlsPrivateKey); err != nil { + if *s3opt.tlsPrivateKey != "" { + glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.VERSION, *s3opt.port) + if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } - glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.VERSION, *s3options.port) } else { + glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.VERSION, *s3opt.port) if err = httpS.Serve(s3ApiListener); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } - glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.VERSION, *s3options.port) } return true diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index ec0723859..062fe0ff8 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -10,7 +10,7 @@ func init() { } var cmdScaffold = &Command{ - UsageLine: "scaffold [filer]", + UsageLine: "scaffold -config=[filer|notification|replication|security|master]", Short: "generate basic configuration files", Long: `Generate filer.toml with all possible configurations for you to customize. @@ -19,7 +19,7 @@ var cmdScaffold = &Command{ var ( outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory") - config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication] the configuration file to generate") + config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master] the configuration file to generate") ) func runScaffold(cmd *Command, args []string) bool { @@ -32,6 +32,10 @@ func runScaffold(cmd *Command, args []string) bool { content = NOTIFICATION_TOML_EXAMPLE case "replication": content = REPLICATION_TOML_EXAMPLE + case "security": + content = SECURITY_TOML_EXAMPLE + case "master": + content = MASTER_TOML_EXAMPLE } if content == "" { println("need a valid -config option") @@ -61,6 +65,12 @@ enabled = false [leveldb] # local on disk, mostly for simple single-machine setup, fairly scalable +enabled = false +dir = "." # directory to store level db files + +[leveldb2] +# local on disk, mostly for simple single-machine setup, fairly scalable +# faster than previous leveldb, recommended. enabled = true dir = "." # directory to store level db files @@ -70,12 +80,13 @@ dir = "." # directory to store level db files [mysql] # CREATE TABLE IF NOT EXISTS filemeta ( -# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', -# name VARCHAR(1000) COMMENT 'directory or file name', -# directory VARCHAR(4096) COMMENT 'full path to parent directory', -# meta BLOB, +# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', +# name VARCHAR(1000) COMMENT 'directory or file name', +# directory TEXT COMMENT 'full path to parent directory', +# meta LONGBLOB, # PRIMARY KEY (dirhash, name) # ) DEFAULT CHARSET=utf8; + enabled = false hostname = "localhost" port = 3306 @@ -88,8 +99,8 @@ connection_max_open = 100 [postgres] # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT, -# name VARCHAR(1000), -# directory VARCHAR(4096), +# name VARCHAR(65535), +# directory VARCHAR(65535), # meta bytea, # PRIMARY KEY (dirhash, name) # ); @@ -132,6 +143,7 @@ addresses = [ "localhost:30005", "localhost:30006", ] +password = "" ` @@ -178,6 +190,17 @@ google_application_credentials = "/path/to/x.json" # path to json credential fil project_id = "" # an existing project id topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists +[notification.gocdk_pub_sub] +# The Go Cloud Development Kit (https://gocloud.dev). +# PubSub API (https://godoc.org/gocloud.dev/pubsub). +# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. +enabled = false +# This URL will Dial the RabbitMQ server at the URL in the environment +# variable RABBIT_SERVER_URL and open the exchange "myexchange". +# The exchange must have already been created by some other means, like +# the RabbitMQ management plugin. +topic_url = "rabbit://myexchange" +sub_url = "rabbit://myqueue" ` REPLICATION_TOML_EXAMPLE = ` @@ -240,4 +263,78 @@ bucket = "mybucket" # an existing bucket directory = "/" # destination directory ` + + SECURITY_TOML_EXAMPLE = ` +# Put this file to one of the location, with descending priority +# ./security.toml +# $HOME/.seaweedfs/security.toml +# /etc/seaweedfs/security.toml +# this file is read by master, volume server, and filer + +# the jwt signing key is read by master and volume server. +# a jwt defaults to expire after 10 seconds. +[jwt.signing] +key = "" +expires_after_seconds = 10 # seconds + +# jwt for read is only supported with master+volume setup. Filer does not support this mode. +[jwt.signing.read] +key = "" +expires_after_seconds = 10 # seconds + +# all grpc tls authentications are mutual +# the values for the following ca, cert, and key are paths to the PERM files. +# the host name is not checked, so the PERM files can be shared. +[grpc] +ca = "" + +[grpc.volume] +cert = "" +key = "" + +[grpc.master] +cert = "" +key = "" + +[grpc.filer] +cert = "" +key = "" + +# use this for any place needs a grpc client +# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" +[grpc.client] +cert = "" +key = "" + + +# volume server https options +# Note: work in progress! +# this does not work with other clients, e.g., "weed filer|mount" etc, yet. +[https.client] +enabled = true +[https.volume] +cert = "" +key = "" + + +` + + MASTER_TOML_EXAMPLE = ` +# Put this file to one of the location, with descending priority +# ./master.toml +# $HOME/.seaweedfs/master.toml +# /etc/seaweedfs/master.toml +# this file is read by master + +[master.maintenance] +# periodically run these scripts are the same as running them from 'weed shell' +scripts = """ + ec.encode -fullPercent=95 -quietFor=1h + ec.rebuild -force + ec.balance -force + volume.balance -force +""" +sleep_minutes = 17 # sleep minutes between each script execution + +` ) diff --git a/weed/command/server.go b/weed/command/server.go index ba5305a97..f8c1d06fc 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -1,6 +1,7 @@ package command import ( + "fmt" "net/http" "os" "runtime" @@ -10,6 +11,10 @@ import ( "sync" "time" + "github.com/chrislusf/raft/protobuf" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/server" @@ -25,7 +30,9 @@ type ServerOptions struct { var ( serverOptions ServerOptions + masterOptions MasterOptions filerOptions FilerOptions + s3Options S3Options ) func init() { @@ -34,51 +41,52 @@ func init() { var cmdServer = &Command{ UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name", - Short: "start a server, including volume server, and automatically elect a master server", + Short: "start a master server, a volume server, and optionally a filer and a S3 gateway", Long: `start both a volume server to provide storage spaces and a master server to provide volume=>location mapping service and sequence number of file ids This is provided as a convenient way to start both volume server and master server. - The servers are exactly the same as starting them separately. - - So other volume servers can use this embedded master server also. + The servers acts exactly the same as starting them separately. + So other volume servers can connect to this master server also. - Optionally, one filer server can be started. Logically, filer servers should not be in a cluster. - They run with meta data on disk, not shared. So each filer server is different. + Optionally, a filer server can be started. + Also optionally, a S3 gateway can be started. `, } var ( - serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") - serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") - serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") - serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") - serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - serverPeers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") - serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") - serverGarbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") - masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") - masterGrpcPort = cmdServer.Flag.Int("master.port.grpc", 0, "master grpc server listen port, default to http port + 10000") - masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") - masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") - masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") - masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") - volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") - pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") + serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") + serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") + serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") + serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") + serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") + volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") + volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") + pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") + isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") serverWhiteList []string ) func init() { serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") + + masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") + masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") + masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") + masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") + masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "000", "Default replication type if not specified.") + masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") + masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address") + masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") - filerOptions.grpcPort = cmdServer.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to http port + 10000") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") @@ -88,15 +96,25 @@ func init() { serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") - serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") + serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") + s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") + s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}") + s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") + s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") + } func runServer(cmd *Command, args []string) bool { - filerOptions.secretKey = serverSecureKey + + util.LoadConfiguration("security", false) + util.LoadConfiguration("master", false) + if *serverOptions.cpuprofile != "" { f, err := os.Create(*serverOptions.cpuprofile) if err != nil { @@ -110,41 +128,53 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - master := *serverIp + ":" + strconv.Itoa(*masterPort) - filerOptions.ip = serverIp + if *isStartingS3 { + *isStartingFiler = true + } + + master := *serverIp + ":" + strconv.Itoa(*masterOptions.port) + masterOptions.ip = serverIp + masterOptions.ipBind = serverBindIp + filerOptions.masters = &master + filerOptions.ip = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp serverOptions.v.masters = &master serverOptions.v.idleConnectionTimeout = serverTimeout - serverOptions.v.maxCpu = serverMaxCpu serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack + serverOptions.v.pulseSeconds = pulseSeconds + masterOptions.pulseSeconds = pulseSeconds + + masterOptions.whiteList = serverWhiteListOption filerOptions.dataCenter = serverDataCenter + filerOptions.disableHttp = serverDisableHttp + masterOptions.disableHttp = serverDisableHttp + + filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) + s3Options.filer = &filerAddress if *filerOptions.defaultReplicaPlacement == "" { - *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement + *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication } - if *serverMaxCpu < 1 { - *serverMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*serverMaxCpu) + runtime.GOMAXPROCS(runtime.NumCPU()) folders := strings.Split(*volumeDataFolders, ",") - if *masterVolumeSizeLimitMB > 30*1000 { + if *masterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") } - if *masterMetaFolder == "" { - *masterMetaFolder = folders[0] + if *masterOptions.metaFolder == "" { + *masterOptions.metaFolder = folders[0] } - if err := util.TestFolderWritable(*masterMetaFolder); err != nil { - glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) + if err := util.TestFolderWritable(*masterOptions.metaFolder); err != nil { + glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterOptions.metaFolder, err) } - filerOptions.defaultLevelDbDirectory = masterMetaFolder + filerOptions.defaultLevelDbDirectory = masterOptions.metaFolder if *serverWhiteListOption != "" { serverWhiteList = strings.Split(*serverWhiteListOption, ",") @@ -159,55 +189,55 @@ func runServer(cmd *Command, args []string) bool { }() } - var raftWaitForMaster sync.WaitGroup + if *isStartingS3 { + go func() { + time.Sleep(2 * time.Second) + + s3Options.startS3Server() + + }() + } + var volumeWait sync.WaitGroup - raftWaitForMaster.Add(1) volumeWait.Add(1) go func() { r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, - *masterVolumeSizeLimitMB, *masterVolumePreallocate, - *pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold, - serverWhiteList, *serverSecureKey, - ) - - glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterPort) - masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), 0) + ms := weed_server.NewMasterServer(r, masterOptions.toMasterOption(serverWhiteList)) + + glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterOptions.port) + masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterOptions.port), 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } go func() { + // start raftServer + myMasterAddress, peers := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + peers, myMasterAddress, *masterOptions.metaFolder, ms.Topo, *masterOptions.pulseSeconds) + ms.SetRaftServer(raftServer) + r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + // starting grpc server - grpcPort := *masterGrpcPort - if grpcPort == 0 { - grpcPort = *masterPort + 10000 - } - grpcL, err := util.NewListener(*serverIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *masterOptions.port + 10000 + grpcL, err := util.NewListener(*serverBindIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer() + glog.V(1).Infof("grpc config %+v", viper.Sub("grpc")) + grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) master_pb.RegisterSeaweedServer(grpcS, ms) + protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort) grpcS.Serve(grpcL) }() - go func() { - raftWaitForMaster.Wait() - time.Sleep(100 * time.Millisecond) - myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers) - raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *pulseSeconds) - ms.SetRaftServer(raftServer) - volumeWait.Done() - }() - - raftWaitForMaster.Done() + volumeWait.Done() // start http server httpS := &http.Server{Handler: r} diff --git a/weed/command/shell.go b/weed/command/shell.go index 19c5049c5..79f8b8bf9 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -1,21 +1,25 @@ package command import ( - "bufio" - "fmt" - "os" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/shell" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/spf13/viper" +) - "github.com/chrislusf/seaweedfs/weed/glog" +var ( + shellOptions shell.ShellOptions ) func init() { cmdShell.Run = runShell // break init cycle + shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers") } var cmdShell = &Command{ UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. + Short: "run interactive administrative commands", + Long: `run interactive administrative commands. `, } @@ -23,39 +27,16 @@ var cmdShell = &Command{ var () func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func() { - var err error - if _, err = o.WriteString("> "); err != nil { - glog.V(0).Infoln("error writing to stdout:", err) - } - if err = o.Flush(); err != nil { - glog.V(0).Infoln("error flushing stdout:", err) - } - } - readLine := func() string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e, err) - os.Exit(1) - } - return ret - } - execCmd := func(cmd string) int { - if cmd != "" { - if _, err := o.WriteString(cmd); err != nil { - glog.V(0).Infoln("error writing to stdout:", err) - } - } - return 0 - } - - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } + + util.LoadConfiguration("security", false) + shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + + shellOptions.FilerHost = "localhost" + shellOptions.FilerPort = 8888 + shellOptions.Directory = "/" + + shell.RunShell(shellOptions) + + return true + } diff --git a/weed/command/upload.go b/weed/command/upload.go index f664c0e3a..25e938d9b 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -6,8 +6,11 @@ import ( "os" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/spf13/viper" + + "github.com/chrislusf/seaweedfs/weed/operation" ) var ( @@ -23,7 +26,6 @@ type UploadOptions struct { dataCenter *string ttl *string maxMB *int - secretKey *string } func init() { @@ -36,8 +38,7 @@ func init() { upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name") upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") - upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit") } var cmdUpload = &Command{ @@ -53,14 +54,17 @@ var cmdUpload = &Command{ All files under the folder and subfolders will be uploaded, each with its own file key. Optional parameter "-include" allows you to specify the file name patterns. - If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly. + If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separately. The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned. `, } func runUpload(cmd *Command, args []string) bool { - secret := security.Secret(*upload.secretKey) + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + if len(args) == 0 { if *upload.dir == "" { return false @@ -77,9 +81,9 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*upload.master, parts, + results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, - *upload.ttl, *upload.maxMB, secret) + *upload.ttl, *upload.maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -96,9 +100,9 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*upload.master, parts, + results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, - *upload.ttl, *upload.maxMB, secret) + *upload.ttl, *upload.maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/weed/command/volume.go b/weed/command/volume.go index 27a075b5b..3c1aa2b50 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -9,6 +9,9 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/server" @@ -32,7 +35,6 @@ type VolumeServerOptions struct { masters *string pulseSeconds *int idleConnectionTimeout *int - maxCpu *int dataCenter *string rack *string whiteList []string @@ -41,6 +43,7 @@ type VolumeServerOptions struct { readRedirect *bool cpuProfile *string memProfile *string + compactionMBPerSecond *int } func init() { @@ -53,14 +56,14 @@ func init() { v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") - v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") - v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") + v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") + v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") } var cmdVolume = &Command{ @@ -78,10 +81,10 @@ var ( ) func runVolume(cmd *Command, args []string) bool { - if *v.maxCpu < 1 { - *v.maxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*v.maxCpu) + + util.LoadConfiguration("security", false) + + runtime.GOMAXPROCS(runtime.NumCPU()) util.SetupProfiling(*v.cpuProfile, *v.memProfile) v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) @@ -137,10 +140,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v switch *v.indexType { case "leveldb": volumeNeedleMapKind = storage.NeedleMapLevelDb - case "boltdb": - volumeNeedleMapKind = storage.NeedleMapBoltDb - case "btree": - volumeNeedleMapKind = storage.NeedleMapBtree + case "leveldbMedium": + volumeNeedleMapKind = storage.NeedleMapLevelDbMedium + case "leveldbLarge": + volumeNeedleMapKind = storage.NeedleMapLevelDbLarge } masters := *v.masters @@ -152,6 +155,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readRedirect, + *v.compactionMBPerSecond, ) listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) @@ -185,13 +189,20 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer() + grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume")) volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer) reflection.Register(grpcS) go grpcS.Serve(grpcL) - if e := http.Serve(listener, volumeMux); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) + if viper.GetString("https.volume.key") != "" { + if e := http.ServeTLS(listener, volumeMux, + viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil { + glog.Fatalf("Volume server fail to serve: %v", e) + } + } else { + if e := http.Serve(listener, volumeMux); e != nil { + glog.Fatalf("Volume server fail to serve: %v", e) + } } } diff --git a/weed/command/webdav.go b/weed/command/webdav.go new file mode 100644 index 000000000..371c4a9ad --- /dev/null +++ b/weed/command/webdav.go @@ -0,0 +1,109 @@ +package command + +import ( + "fmt" + "net/http" + "os/user" + "strconv" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/spf13/viper" +) + +var ( + webDavStandaloneOptions WebDavOption +) + +type WebDavOption struct { + filer *string + port *int + collection *string + tlsPrivateKey *string + tlsCertificate *string +} + +func init() { + cmdWebDav.Run = runWebDav // break init cycle + webDavStandaloneOptions.filer = cmdWebDav.Flag.String("filer", "localhost:8888", "filer server address") + webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port") + webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") + webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") + webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") +} + +var cmdWebDav = &Command{ + UsageLine: "webdav -port=7333 -filer=<ip:port>", + Short: "<unstable> start a webdav server that is backed by a filer", + Long: `start a webdav server that is backed by a filer. + +`, +} + +func runWebDav(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.VERSION, *webDavStandaloneOptions.port) + + return webDavStandaloneOptions.startWebDav() + +} + +func (wo *WebDavOption) startWebDav() bool { + + filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer) + if err != nil { + glog.Fatal(err) + return false + } + + // detect current user + uid, gid := uint32(0), uint32(0) + if u, err := user.Current(); err == nil { + if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { + uid = uint32(parsedId) + } + if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil { + gid = uint32(parsedId) + } + } + + ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ + Filer: *wo.filer, + FilerGrpcAddress: filerGrpcAddress, + GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + Collection: *wo.collection, + Uid: uid, + Gid: gid, + }) + if webdavServer_err != nil { + glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) + } + + httpS := &http.Server{Handler: ws.Handler} + + listenAddress := fmt.Sprintf(":%d", *wo.port) + webDavListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) + if err != nil { + glog.Fatalf("WebDav Server listener on %s error: %v", listenAddress, err) + } + + if *wo.tlsPrivateKey != "" { + glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.VERSION, *wo.port) + if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil { + glog.Fatalf("WebDav Server Fail to serve: %v", err) + } + } else { + glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.VERSION, *wo.port) + if err = httpS.Serve(webDavListener); err != nil { + glog.Fatalf("WebDav Server Fail to serve: %v", err) + } + } + + return true + +} diff --git a/weed/command/weedfuse/README.md b/weed/command/weedfuse/README.md new file mode 100644 index 000000000..1a1496bbb --- /dev/null +++ b/weed/command/weedfuse/README.md @@ -0,0 +1,84 @@ +Mount the SeaweedFS via FUSE + +# Mount by fstab + + +``` +$ # on linux +$ sudo apt-get install fuse +$ sudo echo 'user_allow_other' >> /etc/fuse.conf +$ sudo mv weedfuse /sbin/mount.weedfuse + +$ # on Mac +$ sudo mv weedfuse /sbin/mount_weedfuse + +``` + +On both OS X and Linux, you can add one of the entries to your /etc/fstab file like the following: + +``` +# mount the whole SeaweedFS +localhost:8888/ /home/some/mount/folder weedfuse + +# mount the SeaweedFS sub folder +localhost:8888/sub/dir /home/some/mount/folder weedfuse + +# mount the SeaweedFS sub folder with some options +localhost:8888/sub/dir /home/some/mount/folder weedfuse user + +``` + +To verify it can work, try this command +``` +$ sudo mount -av + +... + +/home/some/mount/folder : successfully mounted + +``` + +If you see `successfully mounted`, try to access the mounted folder and verify everything works. + + +To debug, run these: +``` + +$ weedfuse -foreground localhost:8888/ /home/some/mount/folder + +``` + + +To unmount the folder: +``` + +$ sudo umount /home/some/mount/folder + +``` + +<!-- not working yet! + +# Mount by autofs + +AutoFS can mount a folder if accessed. + +``` +# install autofs +$ sudo apt-get install autofs +``` + +Here is an example on how to mount a folder for all users under `/home` directory. +Assuming there exists corresponding folders under `/home` on both local and SeaweedFS. + +Edit `/etc/auto.master` and `/etc/auto.weedfuse` file with these content +``` +$ cat /etc/auto.master +/home /etc/auto.weedfuse + +$ cat /etc/auto.weedfuse +# map /home/<user> to localhost:8888/home/<user> +* -fstype=weedfuse,rw,allow_other,foreground :localhost\:8888/home/& + +``` + +--> diff --git a/weed/command/weedfuse/weedfuse.go b/weed/command/weedfuse/weedfuse.go new file mode 100644 index 000000000..4c0d12874 --- /dev/null +++ b/weed/command/weedfuse/weedfuse.go @@ -0,0 +1,109 @@ +package main + +import ( + "flag" + "fmt" + "os" + "strings" + + "github.com/chrislusf/seaweedfs/weed/command" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/jacobsa/daemonize" + "github.com/kardianos/osext" +) + +var ( + fuseCommand = flag.NewFlagSet("weedfuse", flag.ContinueOnError) + options = fuseCommand.String("o", "", "comma separated options rw,uid=xxx,gid=xxx") + isForeground = fuseCommand.Bool("foreground", false, "starts as a daemon") +) + +func main() { + + err := fuseCommand.Parse(os.Args[1:]) + if err != nil { + glog.Fatal(err) + } + fmt.Printf("options: %v\n", *options) + + // seems this value is always empty, need to parse it differently + optionsString := *options + prev := "" + for i, arg := range os.Args { + fmt.Printf("args[%d]: %v\n", i, arg) + if prev == "-o" { + optionsString = arg + } + prev = arg + } + + device := fuseCommand.Arg(0) + mountPoint := fuseCommand.Arg(1) + + fmt.Printf("source: %v\n", device) + fmt.Printf("target: %v\n", mountPoint) + + nouser := true + for _, option := range strings.Split(optionsString, ",") { + fmt.Printf("option: %v\n", option) + switch option { + case "user": + nouser = false + } + } + + maybeSetupPath() + + if !*isForeground { + startAsDaemon() + return + } + + parts := strings.SplitN(device, "/", 2) + filer, filerPath := parts[0], parts[1] + + command.RunMount( + filer, "/"+filerPath, mountPoint, "", "000", "", + 4, !nouser, 0, 1000000) + +} + +func maybeSetupPath() { + // sudo mount -av may not include PATH in some linux, e.g., Ubuntu + hasPathEnv := false + for _, e := range os.Environ() { + if strings.HasPrefix(e, "PATH=") { + hasPathEnv = true + } + fmt.Println(e) + } + if !hasPathEnv { + os.Setenv("PATH", "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin") + } +} + +func startAsDaemon() { + + // adapted from gcsfuse + + // Find the executable. + var path string + path, err := osext.Executable() + if err != nil { + glog.Fatalf("osext.Executable: %v", err) + } + + // Set up arguments. Be sure to use foreground mode. + args := append([]string{"-foreground"}, os.Args[1:]...) + + // Pass along PATH so that the daemon can find fusermount on Linux. + env := []string{ + fmt.Sprintf("PATH=%s", os.Getenv("PATH")), + } + + err = daemonize.Run(path, args, env, os.Stdout) + if err != nil { + glog.Fatalf("daemonize.Run: %v", err) + } + +} |
