aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/command
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go2
-rw-r--r--weed/command/benchmark.go74
-rw-r--r--weed/command/command.go16
-rw-r--r--weed/command/compact.go2
-rw-r--r--weed/command/download.go1
-rw-r--r--weed/command/export.go2
-rw-r--r--weed/command/filer.go24
-rw-r--r--weed/command/filer_copy.go172
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/fix.go1
-rw-r--r--weed/command/master.go16
-rw-r--r--weed/command/mount.go60
-rw-r--r--weed/command/mount_linux.go4
-rw-r--r--weed/command/mount_std.go143
-rw-r--r--weed/command/msg_broker.go114
-rw-r--r--weed/command/s3.go113
-rw-r--r--weed/command/scaffold.go54
-rw-r--r--weed/command/server.go37
-rw-r--r--weed/command/shell.go15
-rw-r--r--weed/command/upload.go26
-rw-r--r--weed/command/version.go2
-rw-r--r--weed/command/volume.go19
-rw-r--r--weed/command/watch.go65
-rw-r--r--weed/command/webdav.go56
24 files changed, 733 insertions, 286 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index eb2b5ba4a..615be80cf 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool {
}
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
- if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil {
+ if err = v.Compact2(30*1024*1024*1024, 0); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true
}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 382e7c850..de44fac75 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -19,6 +19,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
@@ -40,6 +41,8 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
+ grpcRead *bool
+ fsync *bool
}
var (
@@ -64,6 +67,8 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
+ b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
sharedBytes = make([]byte, 1024)
}
@@ -110,7 +115,7 @@ func runBenchmark(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
}
@@ -124,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
@@ -224,9 +229,10 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
fp := &operation.FilePart{
- Reader: &FakeReader{id: uint64(id), size: fileSize},
+ Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
FileSize: fileSize,
MimeType: "image/bench", // prevent gzip benchmark content
+ Fsync: *b.fsync,
}
ar := &operation.VolumeAssignRequest{
Count: 1,
@@ -238,7 +244,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
+ if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -278,23 +284,61 @@ func readFiles(fileIdLineChan chan string, s *stat) {
fmt.Printf("reading file %s\n", fid)
}
start := time.Now()
- url, err := b.masterClient.LookupFileId(fid)
- if err != nil {
- s.failed++
- println("!!!! ", fid, " location not found!!!!!")
- continue
+ var bytesRead int
+ var err error
+ if *b.grpcRead {
+ volumeServer, err := b.masterClient.LookupVolumeServer(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
+ } else {
+ url, err := b.masterClient.LookupFileId(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ var bytes []byte
+ bytes, err = util.Get(url)
+ bytesRead = len(bytes)
}
- if bytesRead, err := util.Get(url); err == nil {
+ if err == nil {
s.completed++
- s.transferred += int64(len(bytesRead))
+ s.transferred += int64(bytesRead)
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- fmt.Printf("Failed to read %s error:%v\n", url, err)
+ fmt.Printf("Failed to read %s error:%v\n", fid, err)
}
}
}
+func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
+ err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid})
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, respErr := fileGetClient.Recv()
+ if resp != nil {
+ bytesRead += len(resp.Data)
+ }
+ if respErr != nil {
+ if respErr == io.EOF {
+ return nil
+ }
+ return respErr
+ }
+ }
+ })
+ return
+}
+
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
@@ -509,8 +553,9 @@ func (s *stats) printStats() {
// a fake reader to generate content to upload
type FakeReader struct {
- id uint64 // an id number
- size int64 // max bytes
+ id uint64 // an id number
+ size int64 // max bytes
+ random *rand.Rand
}
func (l *FakeReader) Read(p []byte) (n int, err error) {
@@ -526,6 +571,7 @@ func (l *FakeReader) Read(p []byte) (n int, err error) {
for i := 0; i < 8; i++ {
p[i] = byte(l.id >> uint(i*8))
}
+ l.random.Read(p[8:])
}
l.size -= int64(n)
return
diff --git a/weed/command/command.go b/weed/command/command.go
index 79c00d4cd..9a41a8a7c 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -12,20 +12,22 @@ var Commands = []*Command{
cmdBackup,
cmdCompact,
cmdCopy,
- cmdFix,
+ cmdDownload,
+ cmdExport,
+ cmdFiler,
cmdFilerReplicate,
- cmdServer,
+ cmdFix,
cmdMaster,
- cmdFiler,
+ cmdMount,
cmdS3,
- cmdUpload,
- cmdDownload,
+ cmdMsgBroker,
cmdScaffold,
+ cmdServer,
cmdShell,
+ cmdWatch,
+ cmdUpload,
cmdVersion,
cmdVolume,
- cmdExport,
- cmdMount,
cmdWebDav,
}
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 85313b749..4e28aa725 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -50,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
- if err = v.Compact2(preallocate); err != nil {
+ if err = v.Compact2(preallocate, 0); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
}
diff --git a/weed/command/download.go b/weed/command/download.go
index b3e33defd..be0eb47e5 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -71,6 +71,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
+ io.Copy(ioutil.Discard, rc)
return err
}
defer f.Close()
diff --git a/weed/command/export.go b/weed/command/export.go
index 8d664ad3b..8c32b3f4d 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -195,6 +195,8 @@ func runExport(cmd *Command, args []string) bool {
vid := needle.VolumeId(*export.volumeId)
needleMap := needle_map.NewMemDb()
+ defer needleMap.Close()
+
if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
diff --git a/weed/command/filer.go b/weed/command/filer.go
index ea8392fac..e258b695d 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -9,6 +9,7 @@ import (
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -22,17 +23,18 @@ var (
type FilerOptions struct {
masters *string
ip *string
+ bindIp *string
port *int
publicPort *int
collection *string
defaultReplicaPlacement *string
- redirectOnRead *bool
disableDirListing *bool
maxMB *int
dirListingLimit *int
dataCenter *string
enableNotification *bool
disableHttp *bool
+ cipher *bool
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
@@ -42,16 +44,17 @@ func init() {
cmdFiler.Run = runFiler // break init cycle
f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
- f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
+ f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
+ f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
- f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
+ f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
}
var cmdFiler = &Command{
@@ -102,22 +105,23 @@ func (fo *FilerOptions) startFiler() {
Masters: strings.Split(*fo.masters, ","),
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
- RedirectOnRead: *fo.redirectOnRead,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
- Port: *fo.port,
+ Host: *fo.ip,
+ Port: uint32(*fo.port),
+ Cipher: *fo.cipher,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
}
if *fo.publicPort != 0 {
- publicListeningAddress := *fo.ip + ":" + strconv.Itoa(*fo.publicPort)
- glog.V(0).Infoln("Start Seaweed filer server", util.VERSION, "public at", publicListeningAddress)
+ publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort)
+ glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, 0)
if e != nil {
glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e)
@@ -129,9 +133,9 @@ func (fo *FilerOptions) startFiler() {
}()
}
- glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.VERSION, *fo.ip, *fo.port)
+ glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
filerListener, e := util.NewListener(
- *fo.ip+":"+strconv.Itoa(*fo.port),
+ *fo.bindIp+":"+strconv.Itoa(*fo.port),
time.Duration(10)*time.Second,
)
if e != nil {
@@ -144,7 +148,7 @@ func (fo *FilerOptions) startFiler() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index e5979d786..2d6ba94d6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -16,9 +16,13 @@ import (
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -37,9 +41,10 @@ type CopyOptions struct {
masterClient *wdclient.MasterClient
concurrenctFiles *int
concurrenctChunks *int
- compressionLevel *int
grpcDialOption grpc.DialOption
masters []string
+ cipher bool
+ ttlSec int32
}
func init() {
@@ -52,7 +57,6 @@ func init() {
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
- copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
}
var cmdCopy = &Command{
@@ -107,9 +111,7 @@ func runCopy(cmd *Command, args []string) bool {
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- ctx := context.Background()
-
- masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress)
+ masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
if err != nil {
fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
return false
@@ -124,13 +126,17 @@ func runCopy(cmd *Command, args []string) bool {
*copy.maxMB = int(maxMB)
}
copy.masters = masters
+ copy.cipher = cipher
- copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
- go copy.masterClient.KeepConnectedToMaster()
- copy.masterClient.WaitUntilConnected()
+ ttl, err := needle.ReadTTL(*copy.ttl)
+ if err != nil {
+ fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
+ return false
+ }
+ copy.ttlSec = int32(ttl.Minutes()) * 60
if *cmdCopy.IsDebug {
- util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
+ grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
@@ -153,7 +159,7 @@ func runCopy(cmd *Command, args []string) bool {
filerHost: filerUrl.Host,
filerGrpcAddress: filerGrpcAddress,
}
- if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
+ if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
return
}
@@ -164,13 +170,14 @@ func runCopy(cmd *Command, args []string) bool {
return true
}
-func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
- err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
+ cipher = resp.Cipher
return nil
})
return
@@ -215,9 +222,9 @@ type FileCopyWorker struct {
filerGrpcAddress string
}
-func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
+func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
for task := range fileCopyTaskChan {
- if err := worker.doEachCopy(ctx, task); err != nil {
+ if err := worker.doEachCopy(task); err != nil {
return err
}
}
@@ -233,7 +240,7 @@ type FileCopyTask struct {
gid uint32
}
-func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
+func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
f, err := os.Open(task.sourceLocation)
if err != nil {
@@ -261,36 +268,55 @@ func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask)
}
if chunkCount == 1 {
- return worker.uploadFileAsOne(ctx, task, f)
+ return worker.uploadFileAsOne(task, f)
}
- return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
+ return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
-func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
+func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
+ data, err := ioutil.ReadAll(f)
+ if err != nil {
+ return err
+ }
var chunks []*filer_pb.FileChunk
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
- fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
- uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
+ uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
@@ -299,18 +325,12 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
}
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: 0,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- })
+ chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -325,13 +345,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
Mime: mimeType,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -342,7 +362,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
return nil
}
-func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
+func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
@@ -352,6 +372,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
var wg sync.WaitGroup
var uploadError error
+ var collection, replication string
fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
@@ -363,22 +384,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
<-concurrentChunks
}()
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ if collection == "" {
+ collection = assignResult.Collection
+ }
+ if replication == "" {
+ replication = assignResult.Replication
+ }
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.NewSectionReader(f, i*chunkSize, chunkSize),
- false, "", nil, assignResult.Auth)
+ uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -387,13 +428,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
return
}
- chunksChan <- &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- }
+ chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize)
+
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
}
@@ -410,11 +446,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
+ operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds)
return uploadError
}
- if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -427,15 +463,15 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ Replication: replication,
+ Collection: collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -457,18 +493,12 @@ func detectMimeType(f *os.File) string {
}
if err != nil {
fmt.Printf("read head of %v: %v\n", f.Name(), err)
- return "application/octet-stream"
+ return ""
}
f.Seek(0, io.SeekStart)
mimeType := http.DetectContentType(head[:n])
+ if mimeType == "application/octet-stream" {
+ return ""
+ }
return mimeType
}
-
-func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return util.WithCachedGrpcClient(ctx, func(ctx context.Context, clientConn *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(clientConn)
- return fn(client)
- }, filerAddress, grpcDialOption)
-
-}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 737f0d24a..40f2b570b 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -121,7 +121,6 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
- return true
}
func validateOneEnabledInput(config *viper.Viper) {
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 8903595fa..90d1c4893 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -70,6 +70,7 @@ func runFix(cmd *Command, args []string) bool {
indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
nm := needle_map.NewMemDb()
+ defer nm.Close()
vid := needle.VolumeId(*fixVolumeId)
scanner := &VolumeFileScanner4Fix{
diff --git a/weed/command/master.go b/weed/command/master.go
index c4b11119b..cb6864edd 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -8,10 +8,12 @@ import (
"strings"
"github.com/chrislusf/raft/protobuf"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/gorilla/mux"
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -43,10 +45,10 @@ type MasterOptions struct {
func init() {
cmdMaster.Run = runMaster // break init cycle
m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
- m.ip = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address")
+ m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address")
m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
- m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
+ m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
@@ -81,7 +83,7 @@ func runMaster(cmd *Command, args []string) bool {
util.LoadConfiguration("master", false)
runtime.GOMAXPROCS(runtime.NumCPU())
- util.SetupProfiling(*masterCpuProfile, *masterMemProfile)
+ grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
if err := util.TestFolderWritable(*m.metaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
@@ -109,7 +111,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
- glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress)
+ glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
@@ -129,11 +131,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
- glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort)
+ glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
go grpcS.Serve(grpcL)
go ms.MasterClient.KeepConnectedToMaster()
@@ -146,6 +148,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
+ glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
masterAddress = masterIp + ":" + strconv.Itoa(masterPort)
if peers != "" {
cleanedPeers = strings.Split(peers, ",")
@@ -170,6 +173,7 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
return &weed_server.MasterOption{
+ Host: *m.ip,
Port: *m.port,
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: *m.volumeSizeLimitMB,
diff --git a/weed/command/mount.go b/weed/command/mount.go
index f09b285f7..21c8e7744 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,23 +1,26 @@
package command
import (
- "fmt"
- "strconv"
- "strings"
+ "os"
)
type MountOptions struct {
- filer *string
- filerMountRootPath *string
- dir *string
- dirListCacheLimit *int64
- collection *string
- replication *string
- ttlSec *int
- chunkSizeLimitMB *int
- dataCenter *string
- allowOthers *bool
- umaskString *string
+ filer *string
+ filerMountRootPath *string
+ dir *string
+ dirListCacheLimit *int64
+ collection *string
+ replication *string
+ ttlSec *int
+ chunkSizeLimitMB *int
+ cacheDir *string
+ cacheSizeMB *int64
+ dataCenter *string
+ allowOthers *bool
+ umaskString *string
+ nonempty *bool
+ outsideContainerClusterMode *bool
+ asyncMetaDataCaching *bool
}
var (
@@ -35,12 +38,17 @@ func init() {
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
- mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
+ mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files")
+ mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
+ mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
+ mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
+ mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.")
}
var cmdMount = &Command{
@@ -58,21 +66,11 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
- `,
-}
-
-func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
- }
+ If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose,
+ the volume servers are not accessible by their own ip addresses.
+ In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
+ * All volume server containers are accessible through the same hostname or IP address as the filer.
+ * All volume server container ports are open external to the cluster.
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("filer port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
+ `,
}
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
index 80a5f9da4..25c4f72cf 100644
--- a/weed/command/mount_linux.go
+++ b/weed/command/mount_linux.go
@@ -138,9 +138,7 @@ func parseInfoFile(r io.Reader) ([]*Info, error) {
}
func osSpecificMountOptions() []fuse.MountOption {
- return []fuse.MountOption{
- fuse.AllowNonEmptyMount(),
- }
+ return []fuse.MountOption{}
}
func checkMountPointAvailable(dir string) bool {
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 891810e61..4e83a44a0 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -3,6 +3,7 @@
package command
import (
+ "context"
"fmt"
"os"
"os/user"
@@ -12,19 +13,20 @@ import (
"strings"
"time"
- "github.com/jacobsa/daemonize"
-
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
func runMount(cmd *Command, args []string) bool {
- util.SetupProfiling(*mountCpuProfile, *mountMemProfile)
+ grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
if umaskErr != nil {
@@ -32,27 +34,46 @@ func runMount(cmd *Command, args []string) bool {
return false
}
- return RunMount(
- *mountOptions.filer,
- *mountOptions.filerMountRootPath,
- *mountOptions.dir,
- *mountOptions.collection,
- *mountOptions.replication,
- *mountOptions.dataCenter,
- *mountOptions.chunkSizeLimitMB,
- *mountOptions.allowOthers,
- *mountOptions.ttlSec,
- *mountOptions.dirListCacheLimit,
- os.FileMode(umask),
- )
+ if len(args)>0 {
+ return false
+ }
+
+ return RunMount(&mountOptions, os.FileMode(umask))
}
-func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
- allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool {
+func RunMount(option *MountOptions, umask os.FileMode) bool {
+
+ filer := *option.filer
+ // parse filer grpc address
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
+ if err != nil {
+ glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
+ return true
+ }
+
+ // try to connect to filer, filerBucketsPath may be useful later
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var cipher bool
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer grpc address %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.Infof("failed to talk to filer %s: %v", filerGrpcAddress, err)
+ return true
+ }
+
+ filerMountRootPath := *option.filerMountRootPath
+ dir := *option.dir
+ chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
util.LoadConfiguration("security", false)
- fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
@@ -90,7 +111,7 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
- return false
+ return true
}
mountName := path.Base(dir)
@@ -99,7 +120,7 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
fuse.VolumeName(mountName),
fuse.FSName(filer + ":" + filerMountRootPath),
fuse.Subtype("seaweedfs"),
- fuse.NoAppleDouble(),
+ // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
fuse.NoAppleXattr(),
fuse.NoBrowse(),
fuse.AutoXattr(),
@@ -110,68 +131,66 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
fuse.MaxReadahead(1024 * 128),
fuse.AsyncRead(),
fuse.WritebackCache(),
- fuse.AllowNonEmptyMount(),
}
options = append(options, osSpecificMountOptions()...)
-
- if allowOthers {
+ if *option.allowOthers {
options = append(options, fuse.AllowOther())
}
+ if *option.nonempty {
+ options = append(options, fuse.AllowNonEmptyMount())
+ }
+ // find mount point
+ mountRoot := filerMountRootPath
+ if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
+ mountRoot = mountRoot[0 : len(mountRoot)-1]
+ }
+
+ seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
+ FilerGrpcAddress: filerGrpcAddress,
+ GrpcDialOption: grpcDialOption,
+ FilerMountRootPath: mountRoot,
+ Collection: *option.collection,
+ Replication: *option.replication,
+ TtlSec: int32(*option.ttlSec),
+ ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
+ CacheDir: *option.cacheDir,
+ CacheSizeMB: *option.cacheSizeMB,
+ DataCenter: *option.dataCenter,
+ DirListCacheLimit: *option.dirListCacheLimit,
+ EntryCacheTtl: 3 * time.Second,
+ MountUid: uid,
+ MountGid: gid,
+ MountMode: mountMode,
+ MountCtime: fileInfo.ModTime(),
+ MountMtime: time.Now(),
+ Umask: umask,
+ OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
+ AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching,
+ Cipher: cipher,
+ })
+
+ // mount
c, err := fuse.Mount(dir, options...)
if err != nil {
glog.V(0).Infof("mount: %v", err)
- daemonize.SignalOutcome(err)
return true
}
+ defer fuse.Unmount(dir)
- util.OnInterrupt(func() {
+ grace.OnInterrupt(func() {
fuse.Unmount(dir)
c.Close()
})
- filerGrpcAddress, err := parseFilerGrpcAddress(filer)
- if err != nil {
- glog.V(0).Infof("parseFilerGrpcAddress: %v", err)
- daemonize.SignalOutcome(err)
- return true
- }
-
- mountRoot := filerMountRootPath
- if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
- mountRoot = mountRoot[0 : len(mountRoot)-1]
- }
-
- daemonize.SignalOutcome(nil)
-
- err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{
- FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
- FilerMountRootPath: mountRoot,
- Collection: collection,
- Replication: replication,
- TtlSec: int32(ttlSec),
- ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
- DataCenter: dataCenter,
- DirListCacheLimit: dirListCacheLimit,
- EntryCacheTtl: 3 * time.Second,
- MountUid: uid,
- MountGid: gid,
- MountMode: mountMode,
- MountCtime: fileInfo.ModTime(),
- MountMtime: time.Now(),
- Umask: umask,
- }))
- if err != nil {
- fuse.Unmount(dir)
- }
+ glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir)
+ err = fs.Serve(c, seaweedFileSystem)
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
glog.V(0).Infof("mount process: %v", err)
- daemonize.SignalOutcome(err)
return true
}
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
new file mode 100644
index 000000000..b4b5855ff
--- /dev/null
+++ b/weed/command/msg_broker.go
@@ -0,0 +1,114 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ messageBrokerStandaloneOptions MessageBrokerOptions
+)
+
+type MessageBrokerOptions struct {
+ filer *string
+ ip *string
+ port *int
+ cpuprofile *string
+ memprofile *string
+}
+
+func init() {
+ cmdMsgBroker.Run = runMsgBroker // break init cycle
+ messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
+ messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
+ messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
+ messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
+}
+
+var cmdMsgBroker = &Command{
+ UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
+ Short: "start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMsgBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return messageBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
+
+ grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
+
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
+ cipher := false
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
+ Filers: []string{*msgBrokerOpt.filer},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Ip: *msgBrokerOpt.ip,
+ Port: *msgBrokerOpt.port,
+ Cipher: cipher,
+ }, grpcDialOption)
+
+ // start grpc listener
+ grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 10a486657..7ebd4fab0 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -1,10 +1,13 @@
package command
import (
+ "context"
"fmt"
"net/http"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/gorilla/mux"
@@ -19,29 +22,89 @@ var (
)
type S3Options struct {
- filer *string
- filerBucketsPath *string
- port *int
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
}
func init() {
cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
- s3StandaloneOptions.filerBucketsPath = cmdS3.Flag.String("filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
}
var cmdS3 = &Command{
- UsageLine: "s3 -port=8333 -filer=<ip:port>",
+ UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
Short: "start a s3 API compatible server that is backed by a filer",
Long: `start a s3 API compatible server that is backed by a filer.
+ By default, you can use any access key and secret key to access the S3 APIs.
+ To enable credential based access, create a config.json file similar to this:
+
+{
+ "identities": [
+ {
+ "name": "some_name",
+ "credentials": [
+ {
+ "accessKey": "some_access_key1",
+ "secretKey": "some_secret_key1"
+ }
+ ],
+ "actions": [
+ "Admin",
+ "Read",
+ "Write"
+ ]
+ },
+ {
+ "name": "some_read_only_user",
+ "credentials": [
+ {
+ "accessKey": "some_access_key2",
+ "secretKey": "some_secret_key2"
+ }
+ ],
+ "actions": [
+ "Read"
+ ]
+ },
+ {
+ "name": "some_normal_user",
+ "credentials": [
+ {
+ "accessKey": "some_access_key3",
+ "secretKey": "some_secret_key3"
+ }
+ ],
+ "actions": [
+ "Read",
+ "Write"
+ ]
+ },
+ {
+ "name": "user_limited_to_bucket1",
+ "credentials": [
+ {
+ "accessKey": "some_access_key4",
+ "secretKey": "some_secret_key4"
+ }
+ ],
+ "actions": [
+ "Read:bucket1",
+ "Write:bucket1"
+ ]
+ }
+ ]
+}
+
`,
}
@@ -55,20 +118,44 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
}
+ filerBucketsPath := "/buckets"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerBucketsPath = resp.DirBuckets
+ glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: *s3opt.filer,
FilerGrpcAddress: filerGrpcAddress,
+ Config: *s3opt.config,
DomainName: *s3opt.domainName,
- BucketsPath: *s3opt.filerBucketsPath,
- GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ BucketsPath: filerBucketsPath,
+ GrpcDialOption: grpcDialOption,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
@@ -83,12 +170,12 @@ func (s3opt *S3Options) startS3Server() bool {
}
if *s3opt.tlsPrivateKey != "" {
- glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.VERSION, *s3opt.port)
+ glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
} else {
- glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.VERSION, *s3opt.port)
+ glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
if err = httpS.Serve(s3ApiListener); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index ab658735f..b199f2d2d 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -18,7 +18,7 @@ var cmdScaffold = &Command{
For example, the filer.toml mysql password can be overwritten by environment variable
export WEED_MYSQL_PASSWORD=some_password
Environment variable rules:
- * Prefix fix with "WEED_"
+ * Prefix the variable name with "WEED_"
* Upppercase the reset of variable name.
* Replace '.' with '_'
@@ -74,7 +74,12 @@ const (
# with http DELETE, by default the filer would check whether a folder is empty.
# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
recursive_delete = false
-
+# directories under this folder will be automatically creating a separate bucket
+buckets_folder = "/buckets"
+buckets_fsync = [ # a list of buckets with all write requests fsync=true
+ "important_bucket",
+ "should_always_fsync",
+]
####################################################
# The following are filer store options
@@ -136,13 +141,13 @@ hosts=[
"localhost:9042",
]
-[redis]
+[redis2]
enabled = false
address = "localhost:6379"
password = ""
database = 0
-[redis_cluster]
+[redis_cluster2]
enabled = false
addresses = [
"localhost:30001",
@@ -163,11 +168,11 @@ enabled = false
servers = "localhost:2379"
timeout = "3s"
-[tikv]
+[mongodb]
enabled = false
-pdAddress = "192.168.199.113:2379"
-
-
+uri = "mongodb://localhost:27017"
+option_pool_size = 0
+database = "seaweedfs"
`
NOTIFICATION_TOML_EXAMPLE = `
@@ -262,6 +267,7 @@ aws_secret_access_key = "" # if empty, loads from the shared credentials fil
region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
+endpoint = ""
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@@ -323,6 +329,10 @@ key = ""
cert = ""
key = ""
+[grpc.msg_broker]
+cert = ""
+key = ""
+
# use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
[grpc.client]
@@ -352,15 +362,19 @@ key = ""
[master.maintenance]
# periodically run these scripts are the same as running them from 'weed shell'
scripts = """
+ lock
ec.encode -fullPercent=95 -quietFor=1h
ec.rebuild -force
ec.balance -force
volume.balance -force
+ volume.fix.replication
+ unlock
"""
sleep_minutes = 17 # sleep minutes between each script execution
[master.filer]
-default_filer_url = "http://localhost:8888/"
+default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
+
[master.sequencer]
type = "memory" # Choose [memory|etcd] type for storing the file id sequence
@@ -378,13 +392,27 @@ sequencer_etcd_urls = "http://127.0.0.1:2379"
aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
+ endpoint = ""
# create this number of logical volumes if no more writable volumes
+# count_x means how many copies of data.
+# e.g.:
+# 000 has only one copy, copy_1
+# 010 and 001 has two copies, copy_2
+# 011 has only 3 copies, copy_3
[master.volume_growth]
-count_1 = 7 # create 1 x 7 = 7 actual volumes
-count_2 = 6 # create 2 x 6 = 12 actual volumes
-count_3 = 3 # create 3 x 3 = 9 actual volumes
-count_other = 1 # create n x 1 = n actual volumes
+copy_1 = 7 # create 1 x 7 = 7 actual volumes
+copy_2 = 6 # create 2 x 6 = 12 actual volumes
+copy_3 = 3 # create 3 x 3 = 9 actual volumes
+copy_other = 1 # create n x 1 = n actual volumes
+
+# configuration flags for replication
+[master.replication]
+# any replication counts should be considered minimums. If you specify 010 and
+# have 3 different racks, that's still considered writable. Writes will still
+# try to replicate to all available volumes. You should only use this option
+# if you are doing your own replication or periodic sync of volumes.
+treat_replication_as_minimums = false
`
)
diff --git a/weed/command/server.go b/weed/command/server.go
index 6aa68b6d2..c006f00eb 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -18,10 +18,11 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- masterOptions MasterOptions
- filerOptions FilerOptions
- s3Options S3Options
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ msgBrokerOptions MessageBrokerOptions
)
func init() {
@@ -45,7 +46,7 @@ var cmdServer = &Command{
}
var (
- serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
+ serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
@@ -53,10 +54,11 @@ var (
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
+ volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.")
pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
)
@@ -78,10 +80,10 @@ func init() {
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
- filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
+ filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -92,11 +94,13 @@ func init() {
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
- s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
+ s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+
+ msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
}
@@ -114,11 +118,10 @@ func runServer(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- if *filerOptions.redirectOnRead {
+ if *isStartingS3 {
*isStartingFiler = true
}
-
- if *isStartingS3 {
+ if *isStartingMsgBroker {
*isStartingFiler = true
}
@@ -129,13 +132,15 @@ func runServer(cmd *Command, args []string) bool {
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
filerOptions.masters = &peers
- filerOptions.ip = serverBindIp
+ filerOptions.ip = serverIp
+ filerOptions.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
serverOptions.v.masters = &peers
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
+ msgBrokerOptions.ip = serverIp
serverOptions.v.pulseSeconds = pulseSeconds
masterOptions.pulseSeconds = pulseSeconds
@@ -148,6 +153,7 @@ func runServer(cmd *Command, args []string) bool {
filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
s3Options.filer = &filerAddress
+ msgBrokerOptions.filer = &filerAddress
if *filerOptions.defaultReplicaPlacement == "" {
*filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
@@ -191,6 +197,13 @@ func runServer(cmd *Command, args []string) bool {
}()
}
+ if *isStartingMsgBroker {
+ go func() {
+ time.Sleep(2 * time.Second)
+ msgBrokerOptions.startQueueServer()
+ }()
+ }
+
// start volume server
{
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
diff --git a/weed/command/shell.go b/weed/command/shell.go
index dcf70608f..6dd768f47 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -9,14 +9,14 @@ import (
)
var (
- shellOptions shell.ShellOptions
- shellInitialFilerUrl *string
+ shellOptions shell.ShellOptions
+ shellInitialFiler *string
)
func init() {
cmdShell.Run = runShell // break init cycle
shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
- shellInitialFilerUrl = cmdShell.Flag.String("filer.url", "http://localhost:8888/", "initial filer url")
+ shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
}
var cmdShell = &Command{
@@ -32,12 +32,13 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- var filerPwdErr error
- shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl)
- if filerPwdErr != nil {
- fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr)
+ var err error
+ shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
+ if err != nil {
+ fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
return false
}
+ shellOptions.Directory = "/"
shell.RunShell(shellOptions)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index d71046131..358897aee 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -16,14 +16,15 @@ var (
)
type UploadOptions struct {
- master *string
- dir *string
- include *string
- replication *string
- collection *string
- dataCenter *string
- ttl *string
- maxMB *int
+ master *string
+ dir *string
+ include *string
+ replication *string
+ collection *string
+ dataCenter *string
+ ttl *string
+ maxMB *int
+ usePublicUrl *bool
}
func init() {
@@ -37,6 +38,7 @@ func init() {
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
+ upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
}
var cmdUpload = &Command{
@@ -79,9 +81,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts,
- *upload.replication, *upload.collection, *upload.dataCenter,
- *upload.ttl, *upload.maxMB)
+ results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -98,9 +98,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts,
- *upload.replication, *upload.collection, *upload.dataCenter,
- *upload.ttl, *upload.maxMB)
+ results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/weed/command/version.go b/weed/command/version.go
index 8fdd68ec8..9caf7dc4e 100644
--- a/weed/command/version.go
+++ b/weed/command/version.go
@@ -19,6 +19,6 @@ func runVersion(cmd *Command, args []string) bool {
cmd.Usage()
}
- fmt.Printf("version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
return true
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 9d665d143..f942ec50b 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -10,9 +10,11 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/spf13/viper"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util/httpdown"
@@ -56,7 +58,7 @@ func init() {
cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
- v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
+ v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
@@ -83,7 +85,7 @@ var cmdVolume = &Command{
var (
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
+ maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.")
volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
)
@@ -92,7 +94,7 @@ func runVolume(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
runtime.GOMAXPROCS(runtime.NumCPU())
- util.SetupProfiling(*v.cpuProfile, *v.memProfile)
+ grace.SetupProfiling(*v.cpuProfile, *v.memProfile)
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption)
@@ -126,7 +128,8 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
if *v.ip == "" {
- *v.ip = "127.0.0.1"
+ *v.ip = util.DetectedHostAddress()
+ glog.V(0).Infof("detected volume server ip address: %v", *v.ip)
}
if *v.publicPort == 0 {
@@ -181,7 +184,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
clusterHttpServer := v.startClusterHttpService(volumeMux)
stopChain := make(chan struct{})
- util.OnInterrupt(func() {
+ grace.OnInterrupt(func() {
fmt.Println("volume server has be killed")
var startTime time.Time
@@ -234,7 +237,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
go func() {
@@ -247,7 +250,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
glog.Fatalf("Volume server listener error:%v", e)
@@ -274,7 +277,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
- glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
+ glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
glog.Fatalf("Volume server listener error:%v", e)
diff --git a/weed/command/watch.go b/weed/command/watch.go
new file mode 100644
index 000000000..b46707a62
--- /dev/null
+++ b/weed/command/watch.go
@@ -0,0 +1,65 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ cmdWatch.Run = runWatch // break init cycle
+}
+
+var cmdWatch = &Command{
+ UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]",
+ Short: "see recent changes on a filer",
+ Long: `See recent changes on a filer.
+
+ `,
+}
+
+var (
+ watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+)
+
+func runWatch(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ClientName: "watch",
+ PathPrefix: *watchTarget,
+ SinceNs: time.Now().Add(-*watchStart).UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ fmt.Printf("events: %+v\n", resp.EventNotification)
+ }
+
+ })
+ if watchErr != nil {
+ fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
+ }
+
+ return true
+}
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 0e6f89040..b9676c909 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -1,13 +1,17 @@
package command
import (
+ "context"
"fmt"
"net/http"
+ "os"
"os/user"
"strconv"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -23,6 +27,8 @@ type WebDavOption struct {
collection *string
tlsPrivateKey *string
tlsCertificate *string
+ cacheDir *string
+ cacheSizeMB *int64
}
func init() {
@@ -32,11 +38,13 @@ func init() {
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
+ webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
}
var cmdWebDav = &Command{
UsageLine: "webdav -port=7333 -filer=<ip:port>",
- Short: "<unstable> start a webdav server that is backed by a filer",
+ Short: "start a webdav server that is backed by a filer",
Long: `start a webdav server that is backed by a filer.
`,
@@ -46,7 +54,7 @@ func runWebDav(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.VERSION, *webDavStandaloneOptions.port)
+ glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.Version(), *webDavStandaloneOptions.port)
return webDavStandaloneOptions.startWebDav()
@@ -54,12 +62,6 @@ func runWebDav(cmd *Command, args []string) bool {
func (wo *WebDavOption) startWebDav() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
-
// detect current user
uid, gid := uint32(0), uint32(0)
if u, err := user.Current(); err == nil {
@@ -71,13 +73,45 @@ func (wo *WebDavOption) startWebDav() bool {
}
}
+ // parse filer grpc address
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ var cipher bool
+ // connect to filer
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ break
+ }
+ }
+
ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{
Filer: *wo.filer,
FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ GrpcDialOption: grpcDialOption,
Collection: *wo.collection,
Uid: uid,
Gid: gid,
+ Cipher: cipher,
+ CacheDir: *wo.cacheDir,
+ CacheSizeMB: *wo.cacheSizeMB,
})
if webdavServer_err != nil {
glog.Fatalf("WebDav Server startup error: %v", webdavServer_err)
@@ -92,12 +126,12 @@ func (wo *WebDavOption) startWebDav() bool {
}
if *wo.tlsPrivateKey != "" {
- glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.VERSION, *wo.port)
+ glog.V(0).Infof("Start Seaweed WebDav Server %s at https port %d", util.Version(), *wo.port)
if err = httpS.ServeTLS(webDavListener, *wo.tlsCertificate, *wo.tlsPrivateKey); err != nil {
glog.Fatalf("WebDav Server Fail to serve: %v", err)
}
} else {
- glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.VERSION, *wo.port)
+ glog.V(0).Infof("Start Seaweed WebDav Server %s at http port %d", util.Version(), *wo.port)
if err = httpS.Serve(webDavListener); err != nil {
glog.Fatalf("WebDav Server Fail to serve: %v", err)
}