diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/command | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/backup.go | 2 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 74 | ||||
| -rw-r--r-- | weed/command/command.go | 16 | ||||
| -rw-r--r-- | weed/command/compact.go | 2 | ||||
| -rw-r--r-- | weed/command/download.go | 1 | ||||
| -rw-r--r-- | weed/command/export.go | 2 | ||||
| -rw-r--r-- | weed/command/filer.go | 24 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 172 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 1 | ||||
| -rw-r--r-- | weed/command/fix.go | 1 | ||||
| -rw-r--r-- | weed/command/master.go | 16 | ||||
| -rw-r--r-- | weed/command/mount.go | 60 | ||||
| -rw-r--r-- | weed/command/mount_linux.go | 4 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 143 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 114 | ||||
| -rw-r--r-- | weed/command/s3.go | 113 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 54 | ||||
| -rw-r--r-- | weed/command/server.go | 37 | ||||
| -rw-r--r-- | weed/command/shell.go | 15 | ||||
| -rw-r--r-- | weed/command/upload.go | 26 | ||||
| -rw-r--r-- | weed/command/version.go | 2 | ||||
| -rw-r--r-- | weed/command/volume.go | 19 | ||||
| -rw-r--r-- | weed/command/watch.go | 65 | ||||
| -rw-r--r-- | weed/command/webdav.go | 56 |
24 files changed, 733 insertions, 286 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index eb2b5ba4a..615be80cf 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil { + if err = v.Compact2(30*1024*1024*1024, 0); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 382e7c850..de44fac75 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -19,6 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" @@ -40,6 +41,8 @@ type BenchmarkOptions struct { maxCpu *int grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient + grpcRead *bool + fsync *bool } var ( @@ -64,6 +67,8 @@ func init() { b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read") + b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") sharedBytes = make([]byte, 1024) } @@ -110,7 +115,7 @@ func runBenchmark(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if *b.maxCpu < 1 { *b.maxCpu = runtime.NumCPU() } @@ -124,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ",")) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ",")) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() @@ -224,9 +229,10 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) fp := &operation.FilePart{ - Reader: &FakeReader{id: uint64(id), size: fileSize}, + Reader: &FakeReader{id: uint64(id), size: fileSize, random: random}, FileSize: fileSize, MimeType: "image/bench", // prevent gzip benchmark content + Fsync: *b.fsync, } ar := &operation.VolumeAssignRequest{ Count: 1, @@ -238,7 +244,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil { + if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -278,23 +284,61 @@ func readFiles(fileIdLineChan chan string, s *stat) { fmt.Printf("reading file %s\n", fid) } start := time.Now() - url, err := b.masterClient.LookupFileId(fid) - if err != nil { - s.failed++ - println("!!!! ", fid, " location not found!!!!!") - continue + var bytesRead int + var err error + if *b.grpcRead { + volumeServer, err := b.masterClient.LookupVolumeServer(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption) + } else { + url, err := b.masterClient.LookupFileId(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + var bytes []byte + bytes, err = util.Get(url) + bytesRead = len(bytes) } - if bytesRead, err := util.Get(url); err == nil { + if err == nil { s.completed++ - s.transferred += int64(len(bytesRead)) + s.transferred += int64(bytesRead) readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - fmt.Printf("Failed to read %s error:%v\n", url, err) + fmt.Printf("Failed to read %s error:%v\n", fid, err) } } } +func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) { + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid}) + if err != nil { + return err + } + + for { + resp, respErr := fileGetClient.Recv() + if resp != nil { + bytesRead += len(resp.Data) + } + if respErr != nil { + if respErr == io.EOF { + return nil + } + return respErr + } + } + }) + return +} + func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { @@ -509,8 +553,9 @@ func (s *stats) printStats() { // a fake reader to generate content to upload type FakeReader struct { - id uint64 // an id number - size int64 // max bytes + id uint64 // an id number + size int64 // max bytes + random *rand.Rand } func (l *FakeReader) Read(p []byte) (n int, err error) { @@ -526,6 +571,7 @@ func (l *FakeReader) Read(p []byte) (n int, err error) { for i := 0; i < 8; i++ { p[i] = byte(l.id >> uint(i*8)) } + l.random.Read(p[8:]) } l.size -= int64(n) return diff --git a/weed/command/command.go b/weed/command/command.go index 79c00d4cd..9a41a8a7c 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -12,20 +12,22 @@ var Commands = []*Command{ cmdBackup, cmdCompact, cmdCopy, - cmdFix, + cmdDownload, + cmdExport, + cmdFiler, cmdFilerReplicate, - cmdServer, + cmdFix, cmdMaster, - cmdFiler, + cmdMount, cmdS3, - cmdUpload, - cmdDownload, + cmdMsgBroker, cmdScaffold, + cmdServer, cmdShell, + cmdWatch, + cmdUpload, cmdVersion, cmdVolume, - cmdExport, - cmdMount, cmdWebDav, } diff --git a/weed/command/compact.go b/weed/command/compact.go index 85313b749..4e28aa725 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -50,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { - if err = v.Compact2(preallocate); err != nil { + if err = v.Compact2(preallocate, 0); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } diff --git a/weed/command/download.go b/weed/command/download.go index b3e33defd..be0eb47e5 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -71,6 +71,7 @@ func downloadToFile(server, fileId, saveDir string) error { } f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { + io.Copy(ioutil.Discard, rc) return err } defer f.Close() diff --git a/weed/command/export.go b/weed/command/export.go index 8d664ad3b..8c32b3f4d 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -195,6 +195,8 @@ func runExport(cmd *Command, args []string) bool { vid := needle.VolumeId(*export.volumeId) needleMap := needle_map.NewMemDb() + defer needleMap.Close() + if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil { glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err) } diff --git a/weed/command/filer.go b/weed/command/filer.go index ea8392fac..e258b695d 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" @@ -22,17 +23,18 @@ var ( type FilerOptions struct { masters *string ip *string + bindIp *string port *int publicPort *int collection *string defaultReplicaPlacement *string - redirectOnRead *bool disableDirListing *bool maxMB *int dirListingLimit *int dataCenter *string enableNotification *bool disableHttp *bool + cipher *bool // default leveldb directory, used in "weed server" mode defaultLevelDbDirectory *string @@ -42,16 +44,17 @@ func init() { cmdFiler.Run = runFiler // break init cycle f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") - f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address") + f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") + f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") - f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit") f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size") f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center") f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") + f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") } var cmdFiler = &Command{ @@ -102,22 +105,23 @@ func (fo *FilerOptions) startFiler() { Masters: strings.Split(*fo.masters, ","), Collection: *fo.collection, DefaultReplication: *fo.defaultReplicaPlacement, - RedirectOnRead: *fo.redirectOnRead, DisableDirListing: *fo.disableDirListing, MaxMB: *fo.maxMB, DirListingLimit: *fo.dirListingLimit, DataCenter: *fo.dataCenter, DefaultLevelDbDir: defaultLevelDbDirectory, DisableHttp: *fo.disableHttp, - Port: *fo.port, + Host: *fo.ip, + Port: uint32(*fo.port), + Cipher: *fo.cipher, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) } if *fo.publicPort != 0 { - publicListeningAddress := *fo.ip + ":" + strconv.Itoa(*fo.publicPort) - glog.V(0).Infoln("Start Seaweed filer server", util.VERSION, "public at", publicListeningAddress) + publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort) + glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, 0) if e != nil { glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e) @@ -129,9 +133,9 @@ func (fo *FilerOptions) startFiler() { }() } - glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.VERSION, *fo.ip, *fo.port) + glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port) filerListener, e := util.NewListener( - *fo.ip+":"+strconv.Itoa(*fo.port), + *fo.bindIp+":"+strconv.Itoa(*fo.port), time.Duration(10)*time.Second, ) if e != nil { @@ -144,7 +148,7 @@ func (fo *FilerOptions) startFiler() { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index e5979d786..2d6ba94d6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -16,9 +16,13 @@ import ( "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -37,9 +41,10 @@ type CopyOptions struct { masterClient *wdclient.MasterClient concurrenctFiles *int concurrenctChunks *int - compressionLevel *int grpcDialOption grpc.DialOption masters []string + cipher bool + ttlSec int32 } func init() { @@ -52,7 +57,6 @@ func init() { copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") - copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") } var cmdCopy = &Command{ @@ -107,9 +111,7 @@ func runCopy(cmd *Command, args []string) bool { filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - ctx := context.Background() - - masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress) + masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress) if err != nil { fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) return false @@ -124,13 +126,17 @@ func runCopy(cmd *Command, args []string) bool { *copy.maxMB = int(maxMB) } copy.masters = masters + copy.cipher = cipher - copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters) - go copy.masterClient.KeepConnectedToMaster() - copy.masterClient.WaitUntilConnected() + ttl, err := needle.ReadTTL(*copy.ttl) + if err != nil { + fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err) + return false + } + copy.ttlSec = int32(ttl.Minutes()) * 60 if *cmdCopy.IsDebug { - util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") + grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles) @@ -153,7 +159,7 @@ func runCopy(cmd *Command, args []string) bool { filerHost: filerUrl.Host, filerGrpcAddress: filerGrpcAddress, } - if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil { + if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) return } @@ -164,13 +170,14 @@ func runCopy(cmd *Command, args []string) bool { return true } -func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) { - err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) +func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) } masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb + cipher = resp.Cipher return nil }) return @@ -215,9 +222,9 @@ type FileCopyWorker struct { filerGrpcAddress string } -func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error { +func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { for task := range fileCopyTaskChan { - if err := worker.doEachCopy(ctx, task); err != nil { + if err := worker.doEachCopy(task); err != nil { return err } } @@ -233,7 +240,7 @@ type FileCopyTask struct { gid uint32 } -func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error { +func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error { f, err := os.Open(task.sourceLocation) if err != nil { @@ -261,36 +268,55 @@ func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) } if chunkCount == 1 { - return worker.uploadFileAsOne(ctx, task, f) + return worker.uploadFileAsOne(task, f) } - return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize) + return worker.uploadFileInChunks(task, f, chunkCount, chunkSize) } -func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error { +func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error { // upload the file content fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) + data, err := ioutil.ReadAll(f) + if err != nil { + return err + } var chunks []*filer_pb.FileChunk + var assignResult *filer_pb.AssignVolumeResponse + var assignError error if task.fileSize > 0 { // assign a volume - assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - Ttl: *worker.options.ttl, + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + ParentPath: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(context.Background(), request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + if assignResult.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) + } + return nil }) if err != nil { - fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId - uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel) + uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth)) if err != nil { return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } @@ -299,18 +325,12 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy } fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: 0, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -325,13 +345,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy Mime: mimeType, Replication: *worker.options.replication, Collection: *worker.options.collection, - TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, } - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil @@ -342,7 +362,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy return nil } -func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error { +func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) @@ -352,6 +372,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks) var wg sync.WaitGroup var uploadError error + var collection, replication string fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount) for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ { @@ -363,22 +384,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC <-concurrentChunks }() // assign a volume - assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - Ttl: *worker.options.ttl, + var assignResult *filer_pb.AssignVolumeResponse + var assignError error + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + ParentPath: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(context.Background(), request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + if assignResult.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) + } + return nil }) if err != nil { fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } + if err != nil { + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + if collection == "" { + collection = assignResult.Collection + } + if replication == "" { + replication = assignResult.Replication + } - uploadResult, err := operation.Upload(targetUrl, - fileName+"-"+strconv.FormatInt(i+1, 10), - io.NewSectionReader(f, i*chunkSize, chunkSize), - false, "", nil, assignResult.Auth) + uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) if err != nil { uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) return @@ -387,13 +428,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) return } - chunksChan <- &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: i * chunkSize, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - } + chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize) + fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) }(i) } @@ -410,11 +446,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds) + operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds) return uploadError } - if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -427,15 +463,15 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC FileSize: uint64(task.fileSize), FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + Replication: replication, + Collection: collection, + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, } - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil @@ -457,18 +493,12 @@ func detectMimeType(f *os.File) string { } if err != nil { fmt.Printf("read head of %v: %v\n", f.Name(), err) - return "application/octet-stream" + return "" } f.Seek(0, io.SeekStart) mimeType := http.DetectContentType(head[:n]) + if mimeType == "application/octet-stream" { + return "" + } return mimeType } - -func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - - return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(clientConn) - return fn(client) - }, filerAddress, grpcDialOption) - -} diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 737f0d24a..40f2b570b 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -121,7 +121,6 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - return true } func validateOneEnabledInput(config *viper.Viper) { diff --git a/weed/command/fix.go b/weed/command/fix.go index 8903595fa..90d1c4893 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -70,6 +70,7 @@ func runFix(cmd *Command, args []string) bool { indexFileName := path.Join(*fixVolumePath, baseFileName+".idx") nm := needle_map.NewMemDb() + defer nm.Close() vid := needle.VolumeId(*fixVolumeId) scanner := &VolumeFileScanner4Fix{ diff --git a/weed/command/master.go b/weed/command/master.go index c4b11119b..cb6864edd 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -8,10 +8,12 @@ import ( "strings" "github.com/chrislusf/raft/protobuf" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/gorilla/mux" "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" @@ -43,10 +45,10 @@ type MasterOptions struct { func init() { cmdMaster.Run = runMaster // break init cycle m.port = cmdMaster.Flag.Int("port", 9333, "http listen port") - m.ip = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address") + m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address") m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") - m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") + m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") @@ -81,7 +83,7 @@ func runMaster(cmd *Command, args []string) bool { util.LoadConfiguration("master", false) runtime.GOMAXPROCS(runtime.NumCPU()) - util.SetupProfiling(*masterCpuProfile, *masterMemProfile) + grace.SetupProfiling(*masterCpuProfile, *masterMemProfile) if err := util.TestFolderWritable(*m.metaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err) @@ -109,7 +111,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port) - glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress) + glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) masterListener, e := util.NewListener(listeningAddress, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) @@ -129,11 +131,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) - glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort) + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) go grpcS.Serve(grpcL) go ms.MasterClient.KeepConnectedToMaster() @@ -146,6 +148,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { + glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers) masterAddress = masterIp + ":" + strconv.Itoa(masterPort) if peers != "" { cleanedPeers = strings.Split(peers, ",") @@ -170,6 +173,7 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { return &weed_server.MasterOption{ + Host: *m.ip, Port: *m.port, MetaFolder: *m.metaFolder, VolumeSizeLimitMB: *m.volumeSizeLimitMB, diff --git a/weed/command/mount.go b/weed/command/mount.go index f09b285f7..21c8e7744 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -1,23 +1,26 @@ package command import ( - "fmt" - "strconv" - "strings" + "os" ) type MountOptions struct { - filer *string - filerMountRootPath *string - dir *string - dirListCacheLimit *int64 - collection *string - replication *string - ttlSec *int - chunkSizeLimitMB *int - dataCenter *string - allowOthers *bool - umaskString *string + filer *string + filerMountRootPath *string + dir *string + dirListCacheLimit *int64 + collection *string + replication *string + ttlSec *int + chunkSizeLimitMB *int + cacheDir *string + cacheSizeMB *int64 + dataCenter *string + allowOthers *bool + umaskString *string + nonempty *bool + outsideContainerClusterMode *bool + asyncMetaDataCaching *bool } var ( @@ -35,12 +38,17 @@ func init() { mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") - mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") + mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files") + mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") + mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB (0 will disable cache)") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") + mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") + mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") + mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.") } var cmdMount = &Command{ @@ -58,21 +66,11 @@ var cmdMount = &Command{ On OS X, it requires OSXFUSE (http://osxfuse.github.com/). - `, -} - -func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { - hostnameAndPort := strings.Split(filer, ":") - if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) - } + If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose, + the volume servers are not accessible by their own ip addresses. + In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming: + * All volume server containers are accessible through the same hostname or IP address as the filer. + * All volume server container ports are open external to the cluster. - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("filer port parse error: %v", parseErr) - } - - filerGrpcPort := int(filerPort) + 10000 - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil + `, } diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go index 80a5f9da4..25c4f72cf 100644 --- a/weed/command/mount_linux.go +++ b/weed/command/mount_linux.go @@ -138,9 +138,7 @@ func parseInfoFile(r io.Reader) ([]*Info, error) { } func osSpecificMountOptions() []fuse.MountOption { - return []fuse.MountOption{ - fuse.AllowNonEmptyMount(), - } + return []fuse.MountOption{} } func checkMountPointAvailable(dir string) bool { diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 891810e61..4e83a44a0 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -3,6 +3,7 @@ package command import ( + "context" "fmt" "os" "os/user" @@ -12,19 +13,20 @@ import ( "strings" "time" - "github.com/jacobsa/daemonize" - "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) func runMount(cmd *Command, args []string) bool { - util.SetupProfiling(*mountCpuProfile, *mountMemProfile) + grace.SetupProfiling(*mountCpuProfile, *mountMemProfile) umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64) if umaskErr != nil { @@ -32,27 +34,46 @@ func runMount(cmd *Command, args []string) bool { return false } - return RunMount( - *mountOptions.filer, - *mountOptions.filerMountRootPath, - *mountOptions.dir, - *mountOptions.collection, - *mountOptions.replication, - *mountOptions.dataCenter, - *mountOptions.chunkSizeLimitMB, - *mountOptions.allowOthers, - *mountOptions.ttlSec, - *mountOptions.dirListCacheLimit, - os.FileMode(umask), - ) + if len(args)>0 { + return false + } + + return RunMount(&mountOptions, os.FileMode(umask)) } -func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, - allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool { +func RunMount(option *MountOptions, umask os.FileMode) bool { + + filer := *option.filer + // parse filer grpc address + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer) + if err != nil { + glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) + return true + } + + // try to connect to filer, filerBucketsPath may be useful later + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + var cipher bool + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err) + } + cipher = resp.Cipher + return nil + }) + if err != nil { + glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err) + return true + } + + filerMountRootPath := *option.filerMountRootPath + dir := *option.dir + chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB util.LoadConfiguration("security", false) - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") return false @@ -90,7 +111,7 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente // Ensure target mount point availability if isValid := checkMountPointAvailable(dir); !isValid { glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir) - return false + return true } mountName := path.Base(dir) @@ -99,7 +120,7 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente fuse.VolumeName(mountName), fuse.FSName(filer + ":" + filerMountRootPath), fuse.Subtype("seaweedfs"), - fuse.NoAppleDouble(), + // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders fuse.NoAppleXattr(), fuse.NoBrowse(), fuse.AutoXattr(), @@ -110,68 +131,66 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente fuse.MaxReadahead(1024 * 128), fuse.AsyncRead(), fuse.WritebackCache(), - fuse.AllowNonEmptyMount(), } options = append(options, osSpecificMountOptions()...) - - if allowOthers { + if *option.allowOthers { options = append(options, fuse.AllowOther()) } + if *option.nonempty { + options = append(options, fuse.AllowNonEmptyMount()) + } + // find mount point + mountRoot := filerMountRootPath + if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") { + mountRoot = mountRoot[0 : len(mountRoot)-1] + } + + seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ + FilerGrpcAddress: filerGrpcAddress, + GrpcDialOption: grpcDialOption, + FilerMountRootPath: mountRoot, + Collection: *option.collection, + Replication: *option.replication, + TtlSec: int32(*option.ttlSec), + ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + CacheDir: *option.cacheDir, + CacheSizeMB: *option.cacheSizeMB, + DataCenter: *option.dataCenter, + DirListCacheLimit: *option.dirListCacheLimit, + EntryCacheTtl: 3 * time.Second, + MountUid: uid, + MountGid: gid, + MountMode: mountMode, + MountCtime: fileInfo.ModTime(), + MountMtime: time.Now(), + Umask: umask, + OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode, + AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching, + Cipher: cipher, + }) + + // mount c, err := fuse.Mount(dir, options...) if err != nil { glog.V(0).Infof("mount: %v", err) - daemonize.SignalOutcome(err) return true } + defer fuse.Unmount(dir) - util.OnInterrupt(func() { + grace.OnInterrupt(func() { fuse.Unmount(dir) c.Close() }) - filerGrpcAddress, err := parseFilerGrpcAddress(filer) - if err != nil { - glog.V(0).Infof("parseFilerGrpcAddress: %v", err) - daemonize.SignalOutcome(err) - return true - } - - mountRoot := filerMountRootPath - if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") { - mountRoot = mountRoot[0 : len(mountRoot)-1] - } - - daemonize.SignalOutcome(nil) - - err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{ - FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), - FilerMountRootPath: mountRoot, - Collection: collection, - Replication: replication, - TtlSec: int32(ttlSec), - ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, - DataCenter: dataCenter, - DirListCacheLimit: dirListCacheLimit, - EntryCacheTtl: 3 * time.Second, - MountUid: uid, - MountGid: gid, - MountMode: mountMode, - MountCtime: fileInfo.ModTime(), - MountMtime: time.Now(), - Umask: umask, - })) - if err != nil { - fuse.Unmount(dir) - } + glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir) + err = fs.Serve(c, seaweedFileSystem) // check if the mount process has an error to report <-c.Ready if err := c.MountError; err != nil { glog.V(0).Infof("mount process: %v", err) - daemonize.SignalOutcome(err) return true } diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go new file mode 100644 index 000000000..b4b5855ff --- /dev/null +++ b/weed/command/msg_broker.go @@ -0,0 +1,114 @@ +package command + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/grpc/reflection" + + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + messageBrokerStandaloneOptions MessageBrokerOptions +) + +type MessageBrokerOptions struct { + filer *string + ip *string + port *int + cpuprofile *string + memprofile *string +} + +func init() { + cmdMsgBroker.Run = runMsgBroker // break init cycle + messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") + messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") + messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port") + messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file") + messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file") +} + +var cmdMsgBroker = &Command{ + UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]", + Short: "start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMsgBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + return messageBrokerStandaloneOptions.startQueueServer() + +} + +func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { + + grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) + + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) + if err != nil { + glog.Fatal(err) + return false + } + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") + cipher := false + + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + cipher = resp.Cipher + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + break + } + } + + qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ + Filers: []string{*msgBrokerOpt.filer}, + DefaultReplication: "", + MaxMB: 0, + Ip: *msgBrokerOpt.ip, + Port: *msgBrokerOpt.port, + Cipher: cipher, + }, grpcDialOption) + + // start grpc listener + grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} diff --git a/weed/command/s3.go b/weed/command/s3.go index 10a486657..7ebd4fab0 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -1,10 +1,13 @@ package command import ( + "context" "fmt" "net/http" "time" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/gorilla/mux" @@ -19,29 +22,89 @@ var ( ) type S3Options struct { - filer *string - filerBucketsPath *string - port *int - domainName *string - tlsPrivateKey *string - tlsCertificate *string + filer *string + port *int + config *string + domainName *string + tlsPrivateKey *string + tlsCertificate *string } func init() { cmdS3.Run = runS3 // break init cycle s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") - s3StandaloneOptions.filerBucketsPath = cmdS3.Flag.String("filer.dir.buckets", "/buckets", "folder on filer to store all buckets") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}") + s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file") s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") } var cmdS3 = &Command{ - UsageLine: "s3 -port=8333 -filer=<ip:port>", + UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]", Short: "start a s3 API compatible server that is backed by a filer", Long: `start a s3 API compatible server that is backed by a filer. + By default, you can use any access key and secret key to access the S3 APIs. + To enable credential based access, create a config.json file similar to this: + +{ + "identities": [ + { + "name": "some_name", + "credentials": [ + { + "accessKey": "some_access_key1", + "secretKey": "some_secret_key1" + } + ], + "actions": [ + "Admin", + "Read", + "Write" + ] + }, + { + "name": "some_read_only_user", + "credentials": [ + { + "accessKey": "some_access_key2", + "secretKey": "some_secret_key2" + } + ], + "actions": [ + "Read" + ] + }, + { + "name": "some_normal_user", + "credentials": [ + { + "accessKey": "some_access_key3", + "secretKey": "some_secret_key3" + } + ], + "actions": [ + "Read", + "Write" + ] + }, + { + "name": "user_limited_to_bucket1", + "credentials": [ + { + "accessKey": "some_access_key4", + "secretKey": "some_secret_key4" + } + ], + "actions": [ + "Read:bucket1", + "Write:bucket1" + ] + } + ] +} + `, } @@ -55,20 +118,44 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer) if err != nil { glog.Fatal(err) return false } + filerBucketsPath := "/buckets" + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + filerBucketsPath = resp.DirBuckets + glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath) + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + break + } + } + router := mux.NewRouter().SkipClean(true) _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ Filer: *s3opt.filer, FilerGrpcAddress: filerGrpcAddress, + Config: *s3opt.config, DomainName: *s3opt.domainName, - BucketsPath: *s3opt.filerBucketsPath, - GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), + BucketsPath: filerBucketsPath, + GrpcDialOption: grpcDialOption, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) @@ -83,12 +170,12 @@ func (s3opt *S3Options) startS3Server() bool { } if *s3opt.tlsPrivateKey != "" { - glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.VERSION, *s3opt.port) + glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port) if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } } else { - glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.VERSION, *s3opt.port) + glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port) if err = httpS.Serve(s3ApiListener); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index ab658735f..b199f2d2d 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -18,7 +18,7 @@ var cmdScaffold = &Command{ For example, the filer.toml mysql password can be overwritten by environment variable export WEED_MYSQL_PASSWORD=some_password Environment variable rules: - * Prefix fix with "WEED_" + * Prefix the variable name with "WEED_" * Upppercase the reset of variable name. * Replace '.' with '_' @@ -74,7 +74,12 @@ const ( # with http DELETE, by default the filer would check whether a folder is empty. # recursive_delete will delete all sub folders and files, similar to "rm -Rf" recursive_delete = false - +# directories under this folder will be automatically creating a separate bucket +buckets_folder = "/buckets" +buckets_fsync = [ # a list of buckets with all write requests fsync=true + "important_bucket", + "should_always_fsync", +] #################################################### # The following are filer store options @@ -136,13 +141,13 @@ hosts=[ "localhost:9042", ] -[redis] +[redis2] enabled = false address = "localhost:6379" password = "" database = 0 -[redis_cluster] +[redis_cluster2] enabled = false addresses = [ "localhost:30001", @@ -163,11 +168,11 @@ enabled = false servers = "localhost:2379" timeout = "3s" -[tikv] +[mongodb] enabled = false -pdAddress = "192.168.199.113:2379" - - +uri = "mongodb://localhost:27017" +option_pool_size = 0 +database = "seaweedfs" ` NOTIFICATION_TOML_EXAMPLE = ` @@ -262,6 +267,7 @@ aws_secret_access_key = "" # if empty, loads from the shared credentials fil region = "us-east-2" bucket = "your_bucket_name" # an existing bucket directory = "/" # destination directory +endpoint = "" [sink.google_cloud_storage] # read credentials doc at https://cloud.google.com/docs/authentication/getting-started @@ -323,6 +329,10 @@ key = "" cert = "" key = "" +[grpc.msg_broker] +cert = "" +key = "" + # use this for any place needs a grpc client # i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" [grpc.client] @@ -352,15 +362,19 @@ key = "" [master.maintenance] # periodically run these scripts are the same as running them from 'weed shell' scripts = """ + lock ec.encode -fullPercent=95 -quietFor=1h ec.rebuild -force ec.balance -force volume.balance -force + volume.fix.replication + unlock """ sleep_minutes = 17 # sleep minutes between each script execution [master.filer] -default_filer_url = "http://localhost:8888/" +default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands + [master.sequencer] type = "memory" # Choose [memory|etcd] type for storing the file id sequence @@ -378,13 +392,27 @@ sequencer_etcd_urls = "http://127.0.0.1:2379" aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). region = "us-east-2" bucket = "your_bucket_name" # an existing bucket + endpoint = "" # create this number of logical volumes if no more writable volumes +# count_x means how many copies of data. +# e.g.: +# 000 has only one copy, copy_1 +# 010 and 001 has two copies, copy_2 +# 011 has only 3 copies, copy_3 [master.volume_growth] -count_1 = 7 # create 1 x 7 = 7 actual volumes -count_2 = 6 # create 2 x 6 = 12 actual volumes -count_3 = 3 # create 3 x 3 = 9 actual volumes -count_other = 1 # create n x 1 = n actual volumes +copy_1 = 7 # create 1 x 7 = 7 actual volumes +copy_2 = 6 # create 2 x 6 = 12 actual volumes +copy_3 = 3 # create 3 x 3 = 9 actual volumes +copy_other = 1 # create n x 1 = n actual volumes + +# configuration flags for replication +[master.replication] +# any replication counts should be considered minimums. If you specify 010 and +# have 3 different racks, that's still considered writable. Writes will still +# try to replicate to all available volumes. You should only use this option +# if you are doing your own replication or periodic sync of volumes. +treat_replication_as_minimums = false ` ) diff --git a/weed/command/server.go b/weed/command/server.go index 6aa68b6d2..c006f00eb 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -18,10 +18,11 @@ type ServerOptions struct { } var ( - serverOptions ServerOptions - masterOptions MasterOptions - filerOptions FilerOptions - s3Options S3Options + serverOptions ServerOptions + masterOptions MasterOptions + filerOptions FilerOptions + s3Options S3Options + msgBrokerOptions MessageBrokerOptions ) func init() { @@ -45,7 +46,7 @@ var cmdServer = &Command{ } var ( - serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") + serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") @@ -53,10 +54,11 @@ var ( serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") + volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") + isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") serverWhiteList []string ) @@ -78,10 +80,10 @@ func init() { filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") - filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") + filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") @@ -92,11 +94,13 @@ func init() { serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") - s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}") s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") + s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") + + msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") } @@ -114,11 +118,10 @@ func runServer(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - if *filerOptions.redirectOnRead { + if *isStartingS3 { *isStartingFiler = true } - - if *isStartingS3 { + if *isStartingMsgBroker { *isStartingFiler = true } @@ -129,13 +132,15 @@ func runServer(cmd *Command, args []string) bool { masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp filerOptions.masters = &peers - filerOptions.ip = serverBindIp + filerOptions.ip = serverIp + filerOptions.bindIp = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp serverOptions.v.masters = &peers serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack + msgBrokerOptions.ip = serverIp serverOptions.v.pulseSeconds = pulseSeconds masterOptions.pulseSeconds = pulseSeconds @@ -148,6 +153,7 @@ func runServer(cmd *Command, args []string) bool { filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) s3Options.filer = &filerAddress + msgBrokerOptions.filer = &filerAddress if *filerOptions.defaultReplicaPlacement == "" { *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication @@ -191,6 +197,13 @@ func runServer(cmd *Command, args []string) bool { }() } + if *isStartingMsgBroker { + go func() { + time.Sleep(2 * time.Second) + msgBrokerOptions.startQueueServer() + }() + } + // start volume server { go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) diff --git a/weed/command/shell.go b/weed/command/shell.go index dcf70608f..6dd768f47 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -9,14 +9,14 @@ import ( ) var ( - shellOptions shell.ShellOptions - shellInitialFilerUrl *string + shellOptions shell.ShellOptions + shellInitialFiler *string ) func init() { cmdShell.Run = runShell // break init cycle shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers") - shellInitialFilerUrl = cmdShell.Flag.String("filer.url", "http://localhost:8888/", "initial filer url") + shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port") } var cmdShell = &Command{ @@ -32,12 +32,13 @@ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - var filerPwdErr error - shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl) - if filerPwdErr != nil { - fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr) + var err error + shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) + if err != nil { + fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err) return false } + shellOptions.Directory = "/" shell.RunShell(shellOptions) diff --git a/weed/command/upload.go b/weed/command/upload.go index d71046131..358897aee 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -16,14 +16,15 @@ var ( ) type UploadOptions struct { - master *string - dir *string - include *string - replication *string - collection *string - dataCenter *string - ttl *string - maxMB *int + master *string + dir *string + include *string + replication *string + collection *string + dataCenter *string + ttl *string + maxMB *int + usePublicUrl *bool } func init() { @@ -37,6 +38,7 @@ func init() { upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit") + upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server") } var cmdUpload = &Command{ @@ -79,9 +81,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, - *upload.replication, *upload.collection, *upload.dataCenter, - *upload.ttl, *upload.maxMB) + results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -98,9 +98,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, - *upload.replication, *upload.collection, *upload.dataCenter, - *upload.ttl, *upload.maxMB) + results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/weed/command/version.go b/weed/command/version.go index 8fdd68ec8..9caf7dc4e 100644 --- a/weed/command/version.go +++ b/weed/command/version.go @@ -19,6 +19,6 @@ func runVersion(cmd *Command, args []string) bool { cmd.Usage() } - fmt.Printf("version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + fmt.Printf("version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) return true } diff --git a/weed/command/volume.go b/weed/command/volume.go index 9d665d143..f942ec50b 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -10,9 +10,11 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/spf13/viper" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util/httpdown" @@ -56,7 +58,7 @@ func init() { cmdVolume.Run = runVolume // break init cycle v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public") - v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") + v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") @@ -83,7 +85,7 @@ var cmdVolume = &Command{ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") + maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") ) @@ -92,7 +94,7 @@ func runVolume(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) runtime.GOMAXPROCS(runtime.NumCPU()) - util.SetupProfiling(*v.cpuProfile, *v.memProfile) + grace.SetupProfiling(*v.cpuProfile, *v.memProfile) v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) @@ -126,7 +128,8 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } if *v.ip == "" { - *v.ip = "127.0.0.1" + *v.ip = util.DetectedHostAddress() + glog.V(0).Infof("detected volume server ip address: %v", *v.ip) } if *v.publicPort == 0 { @@ -181,7 +184,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v clusterHttpServer := v.startClusterHttpService(volumeMux) stopChain := make(chan struct{}) - util.OnInterrupt(func() { + grace.OnInterrupt(func() { fmt.Println("volume server has be killed") var startTime time.Time @@ -234,7 +237,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) go func() { @@ -247,7 +250,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) - glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) + glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { glog.Fatalf("Volume server listener error:%v", e) @@ -274,7 +277,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd } listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) - glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) + glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress) listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { glog.Fatalf("Volume server listener error:%v", e) diff --git a/weed/command/watch.go b/weed/command/watch.go new file mode 100644 index 000000000..b46707a62 --- /dev/null +++ b/weed/command/watch.go @@ -0,0 +1,65 @@ +package command + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + cmdWatch.Run = runWatch // break init cycle +} + +var cmdWatch = &Command{ + UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]", + Short: "see recent changes on a filer", + Long: `See recent changes on a filer. + + `, +} + +var ( + watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port") + watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +) + +func runWatch(cmd *Command, args []string) bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + ClientName: "watch", + PathPrefix: *watchTarget, + SinceNs: time.Now().Add(-*watchStart).UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + fmt.Printf("events: %+v\n", resp.EventNotification) + } + + }) + if watchErr != nil { + fmt.Printf("watch %s: %v\n", *watchFiler, watchErr) + } + + return true +} diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 0e6f89040..b9676c909 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -1,13 +1,17 @@ package command import ( + "context" "fmt" "net/http" + "os" "os/user" "strconv" "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" @@ -23,6 +27,8 @@ type WebDavOption struct { collection *string tlsPrivateKey *string tlsCertificate *string + cacheDir *string + cacheSizeMB *int64 } func init() { @@ -32,11 +38,13 @@ func init() { webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") + webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") + webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB") } var cmdWebDav = &Command{ UsageLine: "webdav -port=7333 -filer=<ip:port>", - Short: "<unstable> start a webdav server that is backed by a filer", + Short: "start a webdav server that is backed by a filer", Long: `start a webdav server that is backed by a filer. `, @@ -46,7 +54,7 @@ func runWebDav(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.VERSION, *webDavStandaloneOptions.port) + glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.Version(), *webDavStandaloneOptions.port) return webDavStandaloneOptions.startWebDav() @@ -54,12 +62,6 @@ func runWebDav(cmd *Command, args []string) bool { func (wo *WebDavOption) startWebDav() bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer) - if err != nil { - glog.Fatal(err) - return false - } - // detect current user uid, gid := uint32(0), uint32(0) if u, err := user.Current(); err == nil { @@ -71,13 +73,45 @@ func (wo *WebDavOption) startWebDav() bool { } } + // parse filer grpc address + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer) + if err != nil { + glog.Fatal(err) + return false + } + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + var cipher bool + // connect to filer + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + cipher = resp.Cipher + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress) + break + } + } + ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ Filer: *wo.filer, FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), + GrpcDialOption: grpcDialOption, Collection: *wo.collection, Uid: uid, Gid: gid, + Cipher: cipher, + CacheDir: *wo.cacheDir, + CacheSizeMB: *wo.cacheSizeMB, }) if webdavServer_err != nil { glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) @@ -92,12 +126,12 @@ func (wo *WebDavOption) startWebDav() bool { } if *wo.tlsPrivateKey != "" { - glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.VERSION, *wo.port) + glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.Version(), *wo.port) if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil { glog.Fatalf("WebDav Server Fail to serve: %v", err) } } else { - glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.VERSION, *wo.port) + glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.Version(), *wo.port) if err = httpS.Serve(webDavListener); err != nil { glog.Fatalf("WebDav Server Fail to serve: %v", err) } |
