diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/benchmark.go | 27 | ||||
| -rw-r--r-- | weed/command/command.go | 3 | ||||
| -rw-r--r-- | weed/command/export.go | 2 | ||||
| -rw-r--r-- | weed/command/filer.go | 64 | ||||
| -rw-r--r-- | weed/command/filer_backup.go | 1 | ||||
| -rw-r--r-- | weed/command/filer_cat.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 38 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 268 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 4 | ||||
| -rw-r--r-- | weed/command/gateway.go | 93 | ||||
| -rw-r--r-- | weed/command/iam.go | 97 | ||||
| -rw-r--r-- | weed/command/master.go | 7 | ||||
| -rw-r--r-- | weed/command/mount.go | 4 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 29 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 2 | ||||
| -rw-r--r-- | weed/command/s3.go | 2 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 31 | ||||
| -rw-r--r-- | weed/command/server.go | 36 | ||||
| -rw-r--r-- | weed/command/upload.go | 2 | ||||
| -rw-r--r-- | weed/command/volume.go | 86 | ||||
| -rw-r--r-- | weed/command/webdav.go | 2 |
21 files changed, 676 insertions, 124 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index c1bc80c42..4fedb55f1 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -41,6 +41,7 @@ type BenchmarkOptions struct { grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient fsync *bool + useTcp *bool } var ( @@ -67,6 +68,7 @@ func init() { b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") + b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp") sharedBytes = make([]byte, 1024) } @@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { random := rand.New(rand.NewSource(time.Now().UnixNano())) + volumeTcpClient := wdclient.NewVolumeTcpClient() + for id := range idChan { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) @@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { + if *b.useTcp { + if uploadByTcp(volumeTcpClient, fp) { + fileIdLineChan <- fp.Fid + s.completed++ + s.transferred += fileSize + } else { + s.failed++ + } + } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -293,7 +305,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { } var bytes []byte for _, url := range urls { - bytes, _, err = util.FastGet(url) + bytes, _, err = util.Get(url) if err == nil { break } @@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b } } +func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool { + + err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader) + if err != nil { + glog.Errorf("upload chunk err: %v", err) + return false + } + + return true +} + func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. if err != nil { diff --git a/weed/command/command.go b/weed/command/command.go index 5506e6969..b6efcead2 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -17,13 +17,16 @@ var Commands = []*Command{ cmdFiler, cmdFilerBackup, cmdFilerCat, + cmdFilerMetaBackup, cmdFilerMetaTail, cmdFilerReplicate, cmdFilerSynchronize, cmdFix, + cmdGateway, cmdMaster, cmdMount, cmdS3, + cmdIam, cmdMsgBroker, cmdScaffold, cmdServer, diff --git a/weed/command/export.go b/weed/command/export.go index f100f3af5..1c32e1050 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -215,7 +215,7 @@ func runExport(cmd *Command, args []string) bool { err = storage.ScanVolumeFile(util.ResolvePath(*export.dir), *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner) if err != nil && err != io.EOF { - glog.Fatalf("Export Volume File [ERROR] %s\n", err) + glog.Errorf("Export Volume File [ERROR] %s\n", err) } return true } diff --git a/weed/command/filer.go b/weed/command/filer.go index 534bd9e04..a723b4d8a 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -25,6 +25,8 @@ var ( filerS3Options S3Options filerStartWebDav *bool filerWebDavOptions WebDavOption + filerStartIam *bool + filerIamOptions IamOptions ) type FilerOptions struct { @@ -47,6 +49,7 @@ type FilerOptions struct { metricsHttpPort *int saveToFilerLimit *int defaultLevelDbDirectory *string + concurrentUploadLimitMB *int } func init() { @@ -54,12 +57,12 @@ func init() { f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") - f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") - f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit") + f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit") f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size") f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center") f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack") @@ -69,6 +72,7 @@ func init() { f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") + f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -89,6 +93,10 @@ func init() { filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB") + + // start iam on filer + filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service") + filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port") } var cmdFiler = &Command{ @@ -106,7 +114,7 @@ var cmdFiler = &Command{ GET /path/to/ The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order. - If the "filer.toml" is not found, an embedded filer store will be craeted under "-defaultStoreDir". + If the "filer.toml" is not found, an embedded filer store will be created under "-defaultStoreDir". The example filer.toml configuration file can be generated by "weed scaffold -config=filer" @@ -119,22 +127,33 @@ func runFiler(cmd *Command, args []string) bool { go stats_collect.StartMetricsServer(*f.metricsHttpPort) + filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port) + startDelay := time.Duration(2) if *filerStartS3 { - filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port) filerS3Options.filer = &filerAddress go func() { - time.Sleep(2 * time.Second) + time.Sleep(startDelay * time.Second) filerS3Options.startS3Server() }() + startDelay++ } if *filerStartWebDav { - filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port) filerWebDavOptions.filer = &filerAddress go func() { - time.Sleep(2 * time.Second) + time.Sleep(startDelay * time.Second) filerWebDavOptions.startWebDav() }() + startDelay++ + } + + if *filerStartIam { + filerIamOptions.filer = &filerAddress + filerIamOptions.masters = f.masters + go func() { + time.Sleep(startDelay * time.Second) + filerIamOptions.startIamServer() + }() } f.startFiler() @@ -159,21 +178,22 @@ func (fo *FilerOptions) startFiler() { } fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: strings.Split(*fo.masters, ","), - Collection: *fo.collection, - DefaultReplication: *fo.defaultReplicaPlacement, - DisableDirListing: *fo.disableDirListing, - MaxMB: *fo.maxMB, - DirListingLimit: *fo.dirListingLimit, - DataCenter: *fo.dataCenter, - Rack: *fo.rack, - DefaultLevelDbDir: defaultLevelDbDirectory, - DisableHttp: *fo.disableHttp, - Host: *fo.ip, - Port: uint32(*fo.port), - Cipher: *fo.cipher, - SaveToFilerLimit: *fo.saveToFilerLimit, - Filers: peers, + Masters: strings.Split(*fo.masters, ","), + Collection: *fo.collection, + DefaultReplication: *fo.defaultReplicaPlacement, + DisableDirListing: *fo.disableDirListing, + MaxMB: *fo.maxMB, + DirListingLimit: *fo.dirListingLimit, + DataCenter: *fo.dataCenter, + Rack: *fo.rack, + DefaultLevelDbDir: defaultLevelDbDirectory, + DisableHttp: *fo.disableHttp, + Host: *fo.ip, + Port: uint32(*fo.port), + Cipher: *fo.cipher, + SaveToFilerLimit: int64(*fo.saveToFilerLimit), + Filers: peers, + ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 8cb7441f6..888b46fe7 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -155,4 +155,3 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti }) } - diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index a46098b04..c4281feba 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -110,7 +110,7 @@ func runFilerCat(cmd *Command, args []string) bool { filerCat.filerClient = client - return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64, false) }) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 42f5ec4c3..e7a9b107f 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -56,7 +56,7 @@ func init() { copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") + copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit") copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") } @@ -207,16 +207,6 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } mode := fi.Mode() - if mode.IsDir() { - files, _ := ioutil.ReadDir(fileOrDir) - for _, subFileOrDir := range files { - if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil { - return err - } - } - return nil - } - uid, gid := util.GetFileUidGid(fi) fileCopyTaskChan <- FileCopyTask{ @@ -228,6 +218,16 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi gid: gid, } + if mode.IsDir() { + files, _ := ioutil.ReadDir(fileOrDir) + println("checking directory", fileOrDir) + for _, subFileOrDir := range files { + if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil { + return err + } + } + } + return nil } @@ -293,20 +293,22 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err // upload the file content fileName := filepath.Base(f.Name()) - mimeType := detectMimeType(f) - data, err := ioutil.ReadAll(f) - if err != nil { - return err - } + var mimeType string var chunks []*filer_pb.FileChunk var assignResult *filer_pb.AssignVolumeResponse var assignError error - if task.fileSize > 0 { + if task.fileMode & os.ModeDir == 0 && task.fileSize > 0 { + + mimeType = detectMimeType(f) + data, err := ioutil.ReadAll(f) + if err != nil { + return err + } // assign a volume - err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go new file mode 100644 index 000000000..ba0b44659 --- /dev/null +++ b/weed/command/filer_meta_backup.go @@ -0,0 +1,268 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" + "google.golang.org/grpc" + "io" + "reflect" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + metaBackup FilerMetaBackupOptions +) + +type FilerMetaBackupOptions struct { + grpcDialOption grpc.DialOption + filerAddress *string + filerDirectory *string + restart *bool + backupFilerConfig *string + + store filer.FilerStore +} + +func init() { + cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle + metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port") + metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer") + metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup") + metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store") +} + +var cmdFilerMetaBackup = &Command{ + UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml", + Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml", + Long: `continuously backup filer meta data changes. +The backup writes to another filer store specified in a backup_filer.toml. + + weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" + weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart + + `, +} + +func runFilerMetaBackup(cmd *Command, args []string) bool { + + metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + // load backup_filer.toml + v := viper.New() + v.SetConfigFile(*metaBackup.backupFilerConfig) + + if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file + glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+ + " weed scaffold -config=%s -output=.\n\n\n", + *metaBackup.backupFilerConfig, "backup_filer", "filer") + } + + if err := metaBackup.initStore(v); err != nil { + glog.V(0).Infof("init backup filer store: %v", err) + return true + } + + missingPreviousBackup := false + _, err := metaBackup.getOffset() + if err != nil { + missingPreviousBackup = true + } + + if *metaBackup.restart || missingPreviousBackup { + glog.V(0).Infof("traversing metadata tree...") + startTime := time.Now() + if err := metaBackup.traverseMetadata(); err != nil { + glog.Errorf("traverse meta data: %v", err) + return true + } + glog.V(0).Infof("metadata copied up to %v", startTime) + if err := metaBackup.setOffset(startTime); err != nil { + startTime = time.Now() + } + } + + for { + err := metaBackup.streamMetadataBackup() + if err != nil { + glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err) + time.Sleep(1747 * time.Millisecond) + } + } + + return true +} + +func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error { + // load configuration for default filer store + hasDefaultStoreConfigured := false + for _, store := range filer.Stores { + if v.GetBool(store.GetName() + ".enabled") { + store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore) + if err := store.Initialize(v, store.GetName()+"."); err != nil { + glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err) + } + glog.V(0).Infof("configured filer store to %s", store.GetName()) + hasDefaultStoreConfigured = true + metaBackup.store = filer.NewFilerStoreWrapper(store) + break + } + } + if !hasDefaultStoreConfigured { + return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed()) + } + + return nil +} + +func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) { + var saveErr error + + traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) { + + println("+", parentPath.Child(entry.Name)) + if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil { + saveErr = fmt.Errorf("insert entry error: %v\n", err) + return + } + + }) + + if traverseErr != nil { + return fmt.Errorf("traverse: %v", traverseErr) + } + return saveErr +} + +var ( + MetaBackupKey = []byte("metaBackup") +) + +func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { + + startTime, err := metaBackup.getOffset() + if err != nil { + startTime = time.Now() + } + glog.V(0).Infof("streaming from %v", startTime) + + store := metaBackup.store + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + + ctx := context.Background() + message := resp.EventNotification + + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) + return store.InsertEntry(ctx, entry) + } + if message.OldEntry != nil && message.NewEntry == nil { + println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + } + if message.OldEntry != nil && message.NewEntry != nil { + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) + return store.UpdateEntry(ctx, entry) + } + println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) + if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil { + return err + } + println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry)) + } + + return nil + } + + tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "meta_backup", + PathPrefix: *metaBackup.filerDirectory, + SinceNs: startTime.UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + var counter int64 + var lastWriteTime time.Time + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + if err = eachEntryFunc(resp); err != nil { + return err + } + + counter++ + if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { + glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) + counter = 0 + lastWriteTime = time.Now() + if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil { + return err2 + } + } + + } + + }) + return tailErr +} + +func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { + value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey) + if err != nil { + return + } + tsNs := util.BytesToUint64(value) + + return time.Unix(0, int64(tsNs)), nil +} + +func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error { + valueBuf := make([]byte, 8) + util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano())) + + if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil { + return err + } + return nil +} + +var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) + +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) + +} + +func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 189cacefc..8451ffd78 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -23,7 +23,7 @@ func init() { } var cmdFilerMetaTail = &Command{ - UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]", + UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]", Short: "see continuous changes on a filer", Long: `See continuous changes on a filer. @@ -36,7 +36,7 @@ var cmdFilerMetaTail = &Command{ var ( tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port") - tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer") tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>") diff --git a/weed/command/gateway.go b/weed/command/gateway.go new file mode 100644 index 000000000..8a6f852a5 --- /dev/null +++ b/weed/command/gateway.go @@ -0,0 +1,93 @@ +package command + +import ( + "net/http" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + gatewayOptions GatewayOptions +) + +type GatewayOptions struct { + masters *string + filers *string + bindIp *string + port *int + maxMB *int +} + +func init() { + cmdGateway.Run = runGateway // break init cycle + gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") + gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") + gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") + gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") + gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") +} + +var cmdGateway = &Command{ + UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*", + Short: "start a gateway server that points to a list of master servers or a list of filers", + Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. + + POST /blobs/ + upload the blob and return a chunk id + DELETE /blobs/<chunk_id> + delete a chunk id + + /* + POST /files/path/to/a/file + save /path/to/a/file on filer + DELETE /files/path/to/a/file + delete /path/to/a/file on filer + + POST /topics/topicName + save on filer to /topics/topicName/<ds>/ts.json + */ +`, +} + +func runGateway(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + gatewayOptions.startGateway() + + return true +} + +func (gw *GatewayOptions) startGateway() { + + defaultMux := http.NewServeMux() + + _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ + Masters: strings.Split(*gw.masters, ","), + Filers: strings.Split(*gw.filers, ","), + MaxMB: *gw.maxMB, + }) + if gws_err != nil { + glog.Fatalf("Gateway startup error: %v", gws_err) + } + + glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) + gatewayListener, e := util.NewListener( + *gw.bindIp+":"+strconv.Itoa(*gw.port), + time.Duration(10)*time.Second, + ) + if e != nil { + glog.Fatalf("Filer listener error: %v", e) + } + + httpS := &http.Server{Handler: defaultMux} + if err := httpS.Serve(gatewayListener); err != nil { + glog.Fatalf("Gateway Fail to serve: %v", e) + } + +} diff --git a/weed/command/iam.go b/weed/command/iam.go new file mode 100644 index 000000000..17d0832cb --- /dev/null +++ b/weed/command/iam.go @@ -0,0 +1,97 @@ +package command + +import ( + "context" + "fmt" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/iamapi" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gorilla/mux" + "time" +) + +var ( + iamStandaloneOptions IamOptions +) + +type IamOptions struct { + filer *string + masters *string + port *int +} + +func init() { + cmdIam.Run = runIam // break init cycle + iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "filer server address") + iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers") + iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port") +} + +var cmdIam = &Command{ + UsageLine: "iam [-port=8111] [-filer=<ip:port>] [-masters=<ip:port>,<ip:port>]", + Short: "start a iam API compatible server", + Long: "start a iam API compatible server.", +} + +func runIam(cmd *Command, args []string) bool { + return iamStandaloneOptions.startIamServer() +} + +func (iamopt *IamOptions) startIamServer() bool { + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*iamopt.filer) + if err != nil { + glog.Fatal(err) + return false + } + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + glog.V(0).Infof("IAM read filer configuration: %s", resp) + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) + break + } + } + + router := mux.NewRouter().SkipClean(true) + _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ + Filer: *iamopt.filer, + Port: *iamopt.port, + FilerGrpcAddress: filerGrpcAddress, + GrpcDialOption: grpcDialOption, + }) + glog.V(0).Info("NewIamApiServer created") + if iamApiServer_err != nil { + glog.Fatalf("IAM API Server startup error: %v", iamApiServer_err) + } + + httpS := &http.Server{Handler: router} + + listenAddress := fmt.Sprintf(":%d", *iamopt.port) + iamApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) + if err != nil { + glog.Fatalf("IAM API Server listener on %s error: %v", listenAddress, err) + } + + glog.V(0).Infof("Start Seaweed IAM API Server %s at http port %d", util.Version(), *iamopt.port) + if err = httpS.Serve(iamApiListener); err != nil { + glog.Fatalf("IAM API Server Fail to serve: %v", err) + } + + return true +} diff --git a/weed/command/master.go b/weed/command/master.go index d569919cd..0f5e2156d 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -6,7 +6,6 @@ import ( "google.golang.org/grpc/reflection" "net/http" "os" - "runtime" "sort" "strconv" "strings" @@ -48,8 +47,8 @@ type MasterOptions struct { func init() { cmdMaster.Run = runMaster // break init cycle m.port = cmdMaster.Flag.Int("port", 9333, "http listen port") - m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address") - m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier") + m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to") m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") @@ -86,7 +85,6 @@ func runMaster(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) - runtime.GOMAXPROCS(runtime.NumCPU()) grace.SetupProfiling(*masterCpuProfile, *masterMemProfile) parent, _ := util.FullPath(*m.metaFolder).DirAndName() @@ -138,7 +136,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } - // Create your protocol servers. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) diff --git a/weed/command/mount.go b/weed/command/mount.go index ea439af7c..5811f0b99 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -25,6 +25,7 @@ type MountOptions struct { volumeServerAccess *string uidMap *string gidMap *string + readOnly *bool } var ( @@ -45,7 +46,7 @@ func init() { mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") - mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0") + mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0") mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") @@ -55,6 +56,7 @@ func init() { mountOptions.volumeServerAccess = cmdMount.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]") mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>") mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>") + mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index a6d562d40..2474cf7dd 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -53,7 +53,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { filer := *option.filer // parse filer grpc address - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(filer) if err != nil { glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) return true @@ -63,16 +63,23 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + for i := 0; i < 10; i++ { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + } + cipher = resp.Cipher + return nil + }) if err != nil { - return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + glog.V(0).Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.V(0).Infof("wait for %d seconds ...", i+1) + time.Sleep(time.Duration(i+1) * time.Second) } - cipher = resp.Cipher - return nil - }) + } if err != nil { - glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + glog.Errorf("failed to talk to filer %s: %v", filerGrpcAddress, err) return true } @@ -142,8 +149,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { fuse.Subtype("seaweedfs"), // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders fuse.NoAppleXattr(), - fuse.NoBrowse(), - fuse.AutoXattr(), fuse.ExclCreate(), fuse.DaemonTimeout("3600"), fuse.AllowSUID(), @@ -162,6 +167,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { if *option.nonempty { options = append(options, fuse.AllowNonEmptyMount()) } + if *option.readOnly { + options = append(options, fuse.ReadOnly()) + } // find mount point mountRoot := filerMountRootPath @@ -186,7 +194,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { CacheDir: *option.cacheDir, CacheSizeMB: *option.cacheSizeMB, DataCenter: *option.dataCenter, - EntryCacheTtl: 3 * time.Second, MountUid: uid, MountGid: gid, MountMode: mountMode, diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index b4b5855ff..db0b4148d 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -63,7 +63,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer) if err != nil { glog.Fatal(err) return false diff --git a/weed/command/s3.go b/weed/command/s3.go index d8e3e306b..c8292a7d5 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -137,7 +137,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer) if err != nil { glog.Fatal(err) return false diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index c2d53e4bd..88dc94df1 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -103,9 +103,9 @@ dir = "./filerrdb" # directory to store rocksdb files [mysql] # or memsql, tidb # CREATE TABLE IF NOT EXISTS filemeta ( -# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', -# name VARCHAR(1000) COMMENT 'directory or file name', -# directory TEXT COMMENT 'full path to parent directory', +# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', +# name VARCHAR(1000) BINARY COMMENT 'directory or file name', +# directory TEXT COMMENT 'full path to parent directory', # meta LONGBLOB, # PRIMARY KEY (dirhash, name) # ) DEFAULT CHARSET=utf8; @@ -120,13 +120,16 @@ connection_max_idle = 2 connection_max_open = 100 connection_max_lifetime_seconds = 0 interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" [mysql2] # or memsql, tidb enabled = false createTable = """ CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` ( dirhash BIGINT, - name VARCHAR(1000), + name VARCHAR(1000) BINARY, directory TEXT, meta LONGBLOB, PRIMARY KEY (dirhash, name) @@ -141,6 +144,9 @@ connection_max_idle = 2 connection_max_open = 100 connection_max_lifetime_seconds = 0 interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" [postgres] # or cockroachdb, YugabyteDB # CREATE TABLE IF NOT EXISTS filemeta ( @@ -161,6 +167,9 @@ sslmode = "disable" connection_max_idle = 100 connection_max_open = 100 connection_max_lifetime_seconds = 0 +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" [postgres2] enabled = false @@ -183,6 +192,9 @@ sslmode = "disable" connection_max_idle = 100 connection_max_open = 100 connection_max_lifetime_seconds = 0 +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" [cassandra] # CREATE TABLE filemeta ( @@ -269,7 +281,7 @@ index.max_result_window = 10000 # Make sure they are not the same if using the same store type! # 4. Set enabled to true # -# The following is just using cassandra as an example +# The following is just using redis as an example ########################## [redis2.tmp] enabled = false @@ -440,22 +452,28 @@ expires_after_seconds = 10 # seconds # the host name is not checked, so the PERM files can be shared. [grpc] ca = "" +# Set wildcard domain for enable TLS authentication by common names +allowed_wildcard_domain = "" # .mycompany.com [grpc.volume] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.master] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.filer] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.msg_broker] cert = "" key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names # use this for any place needs a grpc client # i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" @@ -463,7 +481,6 @@ key = "" cert = "" key = "" - # volume server https options # Note: work in progress! # this does not work with other clients, e.g., "weed filer|mount" etc, yet. @@ -501,7 +518,7 @@ default = "localhost:8888" # used by maintenance scripts if the scripts needs [master.sequencer] -type = "raft" # Choose [raft|etcd] type for storing the file id sequence +type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence # when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence # example : http://127.0.0.1:2379,http://127.0.0.1:2389 sequencer_etcd_urls = "http://127.0.0.1:2379" diff --git a/weed/command/server.go b/weed/command/server.go index 611578953..6eb3bf97c 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,9 +2,8 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/grace" "os" - "runtime" - "runtime/pprof" "strings" "time" @@ -16,6 +15,7 @@ import ( type ServerOptions struct { cpuprofile *string + memprofile *string v VolumeServerOptions } @@ -49,8 +49,8 @@ var cmdServer = &Command{ } var ( - serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name") - serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") + serverBindIp = cmdServer.Flag.String("ip.bind", "", "ip address to bind to") serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") @@ -76,6 +76,7 @@ var ( func init() { serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") + serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file") masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") @@ -93,11 +94,12 @@ func init() { filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") - filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit") + filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 4, "split files larger than the limit") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list") filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") + filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") @@ -107,10 +109,12 @@ func init() { serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") + serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") + serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") @@ -137,14 +141,7 @@ func runServer(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) - if *serverOptions.cpuprofile != "" { - f, err := os.Create(*serverOptions.cpuprofile) - if err != nil { - glog.Fatal(err) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } + grace.SetupProfiling(*serverOptions.cpuprofile, *serverOptions.memprofile) if *isStartingS3 { *isStartingFiler = true @@ -156,19 +153,21 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) - peers := strings.Join(peerList, ",") - masterOptions.peers = &peers + if *isStartingMasterServer { + _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) + peers := strings.Join(peerList, ",") + masterOptions.peers = &peers + } // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = &peers + filerOptions.masters = masterOptions.peers filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp - serverOptions.v.masters = &peers + serverOptions.v.masters = masterOptions.peers serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack @@ -189,7 +188,6 @@ func runServer(cmd *Command, args []string) bool { webdavOptions.filer = &filerAddress msgBrokerOptions.filer = &filerAddress - runtime.GOMAXPROCS(runtime.NumCPU()) go stats_collect.StartMetricsServer(*serverMetricsHttpPort) folders := strings.Split(*volumeDataFolders, ",") diff --git a/weed/command/upload.go b/weed/command/upload.go index 67fde2185..0f9361b40 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -43,7 +43,7 @@ func init() { upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit") + upload.maxMB = cmdUpload.Flag.Int("maxMB", 4, "split files larger than the limit") upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server") } diff --git a/weed/command/volume.go b/weed/command/volume.go index 659c93d96..9df500178 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -6,7 +6,6 @@ import ( "net/http" httppprof "net/http/pprof" "os" - "runtime" "runtime/pprof" "strconv" "strings" @@ -36,41 +35,43 @@ var ( ) type VolumeServerOptions struct { - port *int - publicPort *int - folders []string - folderMaxLimits []int - idxFolder *string - ip *string - publicUrl *string - bindIp *string - masters *string - idleConnectionTimeout *int - dataCenter *string - rack *string - whiteList []string - indexType *string - diskType *string - fixJpgOrientation *bool - readRedirect *bool - cpuProfile *string - memProfile *string - compactionMBPerSecond *int - fileSizeLimitMB *int - minFreeSpacePercents []float32 - pprof *bool - preStopSeconds *int - metricsHttpPort *int + port *int + publicPort *int + folders []string + folderMaxLimits []int + idxFolder *string + ip *string + publicUrl *string + bindIp *string + masters *string + idleConnectionTimeout *int + dataCenter *string + rack *string + whiteList []string + indexType *string + diskType *string + fixJpgOrientation *bool + readRedirect *bool + cpuProfile *string + memProfile *string + compactionMBPerSecond *int + fileSizeLimitMB *int + concurrentUploadLimitMB *int + minFreeSpacePercents []float32 + pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int + enableTcp *bool } func init() { cmdVolume.Run = runVolume // break init cycle v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public") - v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name") + v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") - v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") @@ -85,9 +86,11 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") + v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port") } var cmdVolume = &Command{ @@ -109,8 +112,6 @@ func runVolume(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - runtime.GOMAXPROCS(runtime.NumCPU()) - // If --pprof is set we assume the caller wants to be able to collect // cpu and memory profiles via go tool pprof if !*v.pprof { @@ -238,6 +239,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.fixJpgOrientation, *v.readRedirect, *v.compactionMBPerSecond, *v.fileSizeLimitMB, + int64(*v.concurrentUploadLimitMB)*1024*1024, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -251,6 +253,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } } + // starting tcp server + if *v.enableTcp { + go v.startTcpService(volumeServer) + } + // starting the cluster http server clusterHttpServer := v.startClusterHttpService(volumeMux) @@ -368,3 +375,22 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd }() return clusterHttpServer } + +func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000) + glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) + listener, e := util.NewListener(listeningAddress, 0) + if e != nil { + glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e) + } + defer listener.Close() + + for { + c, err := listener.Accept() + if err != nil { + fmt.Println(err) + return + } + go volumeServer.HandleTcpConnection(c) + } +} diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 2bd4a3c61..781ea1e36 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -78,7 +78,7 @@ func (wo *WebDavOption) startWebDav() bool { } // parse filer grpc address - filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer) + filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer) if err != nil { glog.Fatal(err) return false |
